diff --git a/README.md b/README.md index f207cab..95e8610 100644 --- a/README.md +++ b/README.md @@ -115,33 +115,33 @@ Test the `Put()` API: ```sh # Open a terminal and run: -$ kubepfm --target deployment/hedgedemo:8081:8081 +$ kubepfm --target deployment/hedgedemo:9090:9090 # Open another terminal and run: -$ curl localhost:8081/put -d "samplekey samplevalue" +$ curl localhost:9090/put -d "samplekey samplevalue" # To ensure a non-leader sender, you can also specify a # non-leader pod for the kubepfm command above: -$ kubepfm --target hedgedemo-6b5bcd4998-n95n7:8081:8081 +$ kubepfm --target hedgedemo-6b5bcd4998-n95n7:9090:9090 ``` Test the `Get()` API: ```sh # While kubepfm is running on a different terminal, run: -$ curl localhost:8081/get -d "samplekey" +$ curl localhost:9090/get -d "samplekey" ``` Test the `Send()` API: ```sh # While kubepfm is running on a different terminal, run: -$ curl localhost:8081/send -d "hello-world" +$ curl localhost:9090/send -d "hello-world" ``` Test the `Broadcast()` API: ```sh # While kubepfm is running on a different terminal, run: -$ curl localhost:8081/broadcast -d "hello-all" +$ curl localhost:9090/broadcast -d "hello-all" ``` diff --git a/cmd/demo/main.go b/cmd/demo/main.go index d4e6ac5..8bb6f45 100644 --- a/cmd/demo/main.go +++ b/cmd/demo/main.go @@ -2,11 +2,12 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "io" "log" - "math/rand" + "log/slog" "net/http" "os" "os/signal" @@ -16,6 +17,7 @@ import ( "cloud.google.com/go/spanner" "github.com/flowerinthenight/hedge" + protov1 "github.com/flowerinthenight/hedge/proto/v1" ) var ( @@ -27,13 +29,30 @@ var ( func main() { flag.Parse() - rand.Seed(time.Now().UnixNano()) - client, err := spanner.NewClient(context.Background(), *dbstr) + ctx, cancel := context.WithCancel(context.Background()) + client, err := spanner.NewClient(ctx, *dbstr) if err != nil { - log.Println(err) + slog.Error("NewClient failed:", "err", err) return } + in := make(chan *hedge.StreamMessage) + out := make(chan *hedge.StreamMessage) + go func(_ctx context.Context) { + for { + select { + case <-_ctx.Done(): + return + case m := <-in: + b, _ := json.Marshal(m) + slog.Info("input stream:", "val", string(b)) + out <- &hedge.StreamMessage{Payload: &protov1.Payload{Data: []byte("one")}} + out <- &hedge.StreamMessage{Payload: &protov1.Payload{Data: []byte("two")}} + out <- nil // end + } + } + }(context.WithValue(ctx, struct{}{}, nil)) + defer client.Close() op := hedge.New(client, ":8080", *spindleTable, *lockName, *logTable, hedge.WithGroupSyncInterval(time.Second*5), @@ -95,10 +114,10 @@ func main() { // return nil, nil }, ), + hedge.WithLeaderStreamChannels(in, out), ) log.Println(op) - ctx, cancel := context.WithCancel(context.Background()) done := make(chan error, 1) go op.Run(ctx, done) @@ -171,6 +190,22 @@ func main() { w.Write([]byte(out)) }) + mux.HandleFunc("/streamsend", func(w http.ResponseWriter, r *http.Request) { + in, out, err := op.StreamToLeader(context.Background()) + if err != nil { + w.Write([]byte(err.Error())) + return + } + + in <- &hedge.StreamMessage{Payload: &protov1.Payload{Data: []byte("test")}} + close(in) // we're done with input + for m := range out { + slog.Info("reply:", "out", string(m.Payload.Data)) + } + + w.Write([]byte("OK")) + }) + mux.HandleFunc("/broadcast", func(w http.ResponseWriter, r *http.Request) { hostname, _ := os.Hostname() msg := "hello" // default @@ -211,7 +246,7 @@ func main() { w.Write([]byte(strings.Join(outs, "\n"))) }) - s := &http.Server{Addr: ":8081", Handler: mux} + s := &http.Server{Addr: ":9090", Handler: mux} go s.ListenAndServe() // Interrupt handler. diff --git a/grpcsvc.go b/grpcsvc.go new file mode 100644 index 0000000..6ebf566 --- /dev/null +++ b/grpcsvc.go @@ -0,0 +1,65 @@ +package hedge + +import ( + "io" + "sync" + + protov1 "github.com/flowerinthenight/hedge/proto/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type service struct { + op *Op + + protov1.UnimplementedHedgeServer +} + +func (s *service) Send(hs protov1.Hedge_SendServer) error { + ctx := hs.Context() + var w sync.WaitGroup + w.Add(1) + go func() { + defer w.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + + in, err := hs.Recv() + if err == io.EOF { + return + } + + s.op.streamIn <- &StreamMessage{Payload: in} + } + }() + + w.Add(1) + go func() { + defer w.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + + out := <-s.op.streamOut + if out == nil { + return + } + + hs.Send(out.Payload) + } + }() + + w.Wait() + return nil +} + +func (s *service) Broadcast(hs protov1.Hedge_BroadcastServer) error { + return status.Errorf(codes.Unimplemented, "method Broadcast not implemented") +} diff --git a/hedge.go b/hedge.go index 26a1598..6af7fcf 100644 --- a/hedge.go +++ b/hedge.go @@ -6,19 +6,25 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" "log" "net" "os" + "strconv" "strings" "sync" "sync/atomic" "time" "cloud.google.com/go/spanner" + protov1 "github.com/flowerinthenight/hedge/proto/v1" "github.com/flowerinthenight/spindle" "github.com/google/uuid" "github.com/hashicorp/memberlist" "google.golang.org/api/iterator" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/reflection" ) const ( @@ -143,9 +149,42 @@ func (w withLogger) Apply(op *Op) { op.logger = w.l } // log.New(ioutil.Discard, "", 0) func WithLogger(v *log.Logger) Option { return withLogger{v} } +type withGrpcHostPort string + +func (w withGrpcHostPort) Apply(op *Op) { op.grpcHostPort = string(w) } + +// WithGrpcHostPort sets Op's internal grpc host/port address. +// Defaults to the internal TCP host:port+1. +func WithGrpcHostPort(v string) Option { return withGrpcHostPort(v) } + +type StreamMessage struct { + Payload *protov1.Payload + Error error +} + +type withLeaderStreamChannels struct { + in chan *StreamMessage + out chan *StreamMessage +} + +func (w withLeaderStreamChannels) Apply(op *Op) { + op.streamIn = w.in + op.streamOut = w.out +} + +// WithLeaderStreamChannels sets the streaming input and output channels for sending +// streaming messages to the leader. All incoming stream messages to the leader will +// be sent to the `in` channel. A nil message indicates the end of the streaming data. +// After sending all messages to `in`, the handler will then listen to the `out` channel +// for reply messages. A nil message indicates the end of the reply stream. +func WithLeaderStreamChannels(in chan *StreamMessage, out chan *StreamMessage) Option { + return withLeaderStreamChannels{in, out} +} + // Op is our main instance for hedge operations. type Op struct { hostPort string // this instance's id; address:port + grpcHostPort string // default is host:port+1 (from `hostPort`) spannerClient *spanner.Client // both for spindle and hedge lockTable string // spindle lock table lockName string // spindle lock name @@ -156,6 +195,8 @@ type Op struct { fnLdrData interface{} // arbitrary data passed to fnLeader fnBroadcast FnMsgHandler // broadcast message handler fnBcData interface{} // arbitrary data passed to fnBroadcast + streamIn chan *StreamMessage + streamOut chan *StreamMessage *spindle.Lock // handles our distributed lock members map[string]struct{} // key=id @@ -225,19 +266,19 @@ func (op *Op) Run(ctx context.Context, done ...chan error) error { return err } - listener, err := net.ListenTCP("tcp", addr) + tl, err := net.ListenTCP("tcp", addr) if err != nil { return err } - defer listener.Close() + defer tl.Close() op.logger.Printf("tcp: listen on %v", op.hostPort) go func() { for { - conn, err := listener.Accept() + conn, err := tl.Accept() if err != nil { - op.logger.Printf("listener.Accept failed: %v", err) + op.logger.Printf("Accept failed: %v", err) return } @@ -250,6 +291,20 @@ func (op *Op) Run(ctx context.Context, done ...chan error) error { } }() + gl, err := net.Listen("tcp", op.grpcHostPort) + if err != nil { + return err + } + + defer gl.Close() + op.logger.Printf("grpc: listen on %v", op.grpcHostPort) + + gs := grpc.NewServer() + svc := &service{op: op} + protov1.RegisterHedgeServer(gs, svc) + reflection.Register(gs) // register reflection service + go gs.Serve(gl) + // Setup and start our internal spindle object. op.Lock = spindle.New( op.spannerClient, @@ -419,6 +474,7 @@ func (op *Op) Run(ctx context.Context, done ...chan error) error { <-ctx.Done() // wait for termination + gs.GracefulStop() // stop grpc server if atomic.LoadInt32(&op.ensureOn) == 1 { op.ensureCancel() // stop semaphore checker; <-op.ensureDone // and wait @@ -659,6 +715,79 @@ func (op *Op) Send(ctx context.Context, msg []byte) ([]byte, error) { return nil, fmt.Errorf(string(b)) } +// StreamToLeader returns an input and output channels for streaming to leader. To use the channels, +// send your request message(s) to the input channel, close it (i.e. close(input)), then read the +// replies from the output channel. This function will close the output channel when done. +// +// StreamToLeader is sequential in the sense that you need to send all your input messages first +// before getting any response from the leader. +func (op *Op) StreamToLeader(ctx context.Context) (chan *StreamMessage, chan *StreamMessage, error) { + if op.streamIn == nil || op.streamOut == nil { + return nil, nil, fmt.Errorf("hedge: input/output channel(s) cannot be nil") + } + + conn, err := op.getLeaderGrpcConn(ctx) + if err != nil { + return nil, nil, err + } + + client := protov1.NewHedgeClient(conn) + stream, err := client.Send(ctx) + if err != nil { + return nil, nil, err + } + + keyId := "id" + id := uuid.NewString() + in := make(chan *StreamMessage) + out := make(chan *StreamMessage) + reply := make(chan error) + go func() { + var err error + for m := range in { + if m.Payload.Meta == nil { + m.Payload.Meta = map[string]string{keyId: id} + } else { + if _, ok := m.Payload.Meta[keyId]; !ok { + m.Payload.Meta[keyId] = id + } + } + + err = stream.Send(m.Payload) + if err != nil { + break + } + } + + stream.CloseSend() + reply <- err + }() + + go func() { + defer func() { + close(out) + conn.Close() + }() + + err := <-reply + if err != nil { + out <- &StreamMessage{Error: err} + return + } + + for { + resp, err := stream.Recv() + if err == io.EOF { + return + } + + out <- &StreamMessage{Payload: resp} + } + }() + + return in, out, nil +} + type BroadcastOutput struct { Id string `json:"id,omitempty"` Reply []byte `json:"reply,omitempty"` @@ -888,6 +1017,80 @@ func (op *Op) getLeaderConn(ctx context.Context) (net.Conn, error) { return conn, nil } +// Don't forget to close the returned connection. +func (op *Op) getLeaderGrpcConn(ctx context.Context) (*grpc.ClientConn, error) { + var conn *grpc.ClientConn + var err error + subctx := context.WithValue(ctx, struct{}{}, nil) + first := make(chan struct{}, 1) + first <- struct{}{} // immediately the first time + tcnt, tlimit := int64(0), (op.lockTimeout/2000)*2 + ticker := time.NewTicker(time.Second * 2) // processing can be more than this + defer ticker.Stop() + + var active int32 + getConn := func() (*grpc.ClientConn, error) { + atomic.StoreInt32(&active, 1) + defer atomic.StoreInt32(&active, 0) + leader, err := op.Leader() + if err != nil { + return nil, err + } + + if leader == "" { + return nil, ErrNoLeader + } + + h, _, _ := net.SplitHostPort(leader) + _, gp, _ := net.SplitHostPort(op.grpcHostPort) + gleader := net.JoinHostPort(h, gp) + + var opts []grpc.DialOption + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + lconn, err := grpc.NewClient(gleader, opts...) + if err != nil { + return nil, err + } + + return lconn, nil + } + + type connT struct { + conn *grpc.ClientConn + err error + } + + for { + select { + case <-subctx.Done(): + return nil, context.Canceled + case <-first: + case <-ticker.C: + } + + if atomic.LoadInt32(&active) == 1 { + continue + } + + ch := make(chan connT, 1) + go func() { + c, e := getConn() + ch <- connT{c, e} + }() + + res := <-ch + conn = res.conn + err = res.err + + tcnt++ + if err == nil || (tcnt > tlimit) { + break + } + } + + return conn, nil +} + func (op *Op) getMembers() map[string]struct{} { op.mtx.Lock() copy := make(map[string]struct{}) @@ -964,6 +1167,12 @@ func New(client *spanner.Client, hostPort, lockTable, lockName, logTable string, list.Shutdown() } + if op.grpcHostPort == "" { + host, port, _ := net.SplitHostPort(op.hostPort) + pi, _ := strconv.Atoi(port) + op.grpcHostPort = net.JoinHostPort(host, fmt.Sprintf("%v", pi+1)) + } + switch { case op.lockTimeout == 0: op.lockTimeout = 30000 // default 30s diff --git a/proto/v1/default.pb.go b/proto/v1/default.pb.go index 0e70bb6..0bb8963 100644 --- a/proto/v1/default.pb.go +++ b/proto/v1/default.pb.go @@ -20,14 +20,17 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type HelloRequest struct { +type Payload struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + Meta map[string]string `protobuf:"bytes,1,rep,name=meta,proto3" json:"meta,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` } -func (x *HelloRequest) Reset() { - *x = HelloRequest{} +func (x *Payload) Reset() { + *x = Payload{} if protoimpl.UnsafeEnabled { mi := &file_default_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -35,13 +38,13 @@ func (x *HelloRequest) Reset() { } } -func (x *HelloRequest) String() string { +func (x *Payload) String() string { return protoimpl.X.MessageStringOf(x) } -func (*HelloRequest) ProtoMessage() {} +func (*Payload) ProtoMessage() {} -func (x *HelloRequest) ProtoReflect() protoreflect.Message { +func (x *Payload) ProtoReflect() protoreflect.Message { mi := &file_default_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -53,56 +56,23 @@ func (x *HelloRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use HelloRequest.ProtoReflect.Descriptor instead. -func (*HelloRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use Payload.ProtoReflect.Descriptor instead. +func (*Payload) Descriptor() ([]byte, []int) { return file_default_proto_rawDescGZIP(), []int{0} } -type HelloResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Response string `protobuf:"bytes,1,opt,name=response,proto3" json:"response,omitempty"` -} - -func (x *HelloResponse) Reset() { - *x = HelloResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_default_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *HelloResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*HelloResponse) ProtoMessage() {} - -func (x *HelloResponse) ProtoReflect() protoreflect.Message { - mi := &file_default_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms +func (x *Payload) GetMeta() map[string]string { + if x != nil { + return x.Meta } - return mi.MessageOf(x) -} - -// Deprecated: Use HelloResponse.ProtoReflect.Descriptor instead. -func (*HelloResponse) Descriptor() ([]byte, []int) { - return file_default_proto_rawDescGZIP(), []int{1} + return nil } -func (x *HelloResponse) GetResponse() string { +func (x *Payload) GetData() []byte { if x != nil { - return x.Response + return x.Data } - return "" + return nil } var File_default_proto protoreflect.FileDescriptor @@ -110,19 +80,28 @@ var File_default_proto protoreflect.FileDescriptor var file_default_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x22, - 0x0e, 0x0a, 0x0c, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, - 0x2b, 0x0a, 0x0d, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x4f, 0x0a, 0x05, - 0x48, 0x65, 0x64, 0x67, 0x65, 0x12, 0x46, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x1c, - 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, - 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x68, - 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, - 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x2c, 0x5a, - 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x66, 0x6c, 0x6f, 0x77, - 0x65, 0x72, 0x69, 0x6e, 0x74, 0x68, 0x65, 0x6e, 0x69, 0x67, 0x68, 0x74, 0x2f, 0x68, 0x65, 0x64, - 0x67, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x8d, 0x01, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x35, 0x0a, 0x04, 0x6d, + 0x65, 0x74, 0x61, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x68, 0x65, 0x64, 0x67, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x6d, 0x65, + 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x37, 0x0a, 0x09, 0x4d, 0x65, 0x74, 0x61, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, + 0x8c, 0x01, 0x0a, 0x05, 0x48, 0x65, 0x64, 0x67, 0x65, 0x12, 0x3e, 0x0a, 0x04, 0x53, 0x65, 0x6e, + 0x64, 0x12, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x17, 0x2e, 0x68, 0x65, 0x64, + 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, + 0x6f, 0x61, 0x64, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x43, 0x0a, 0x09, 0x42, 0x72, 0x6f, + 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x12, 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, + 0x17, 0x2e, 0x68, 0x65, 0x64, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, + 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x2c, + 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x66, 0x6c, 0x6f, + 0x77, 0x65, 0x72, 0x69, 0x6e, 0x74, 0x68, 0x65, 0x6e, 0x69, 0x67, 0x68, 0x74, 0x2f, 0x68, 0x65, + 0x64, 0x67, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -139,17 +118,20 @@ func file_default_proto_rawDescGZIP() []byte { var file_default_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_default_proto_goTypes = []interface{}{ - (*HelloRequest)(nil), // 0: hedge.proto.v1.HelloRequest - (*HelloResponse)(nil), // 1: hedge.proto.v1.HelloResponse + (*Payload)(nil), // 0: hedge.proto.v1.Payload + nil, // 1: hedge.proto.v1.Payload.MetaEntry } var file_default_proto_depIdxs = []int32{ - 0, // 0: hedge.proto.v1.Hedge.Hello:input_type -> hedge.proto.v1.HelloRequest - 1, // 1: hedge.proto.v1.Hedge.Hello:output_type -> hedge.proto.v1.HelloResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 1, // 0: hedge.proto.v1.Payload.meta:type_name -> hedge.proto.v1.Payload.MetaEntry + 0, // 1: hedge.proto.v1.Hedge.Send:input_type -> hedge.proto.v1.Payload + 0, // 2: hedge.proto.v1.Hedge.Broadcast:input_type -> hedge.proto.v1.Payload + 0, // 3: hedge.proto.v1.Hedge.Send:output_type -> hedge.proto.v1.Payload + 0, // 4: hedge.proto.v1.Hedge.Broadcast:output_type -> hedge.proto.v1.Payload + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_default_proto_init() } @@ -159,19 +141,7 @@ func file_default_proto_init() { } if !protoimpl.UnsafeEnabled { file_default_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*HelloRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_default_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*HelloResponse); i { + switch v := v.(*Payload); i { case 0: return &v.state case 1: diff --git a/proto/v1/default.proto b/proto/v1/default.proto index b1aadef..e87394f 100644 --- a/proto/v1/default.proto +++ b/proto/v1/default.proto @@ -5,11 +5,11 @@ package hedge.proto.v1; option go_package = "github.com/flowerinthenight/hedge/proto/v1"; service Hedge { - rpc Hello(HelloRequest) returns (HelloResponse) {} + rpc Send(stream Payload) returns (stream Payload) {} + rpc Broadcast(stream Payload) returns (stream Payload) {} } -message HelloRequest {} - -message HelloResponse { - string response = 1; +message Payload { + map meta = 1; + bytes data = 2; } diff --git a/proto/v1/default_grpc.pb.go b/proto/v1/default_grpc.pb.go index 727d34e..107482c 100644 --- a/proto/v1/default_grpc.pb.go +++ b/proto/v1/default_grpc.pb.go @@ -18,7 +18,8 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type HedgeClient interface { - Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) + Send(ctx context.Context, opts ...grpc.CallOption) (Hedge_SendClient, error) + Broadcast(ctx context.Context, opts ...grpc.CallOption) (Hedge_BroadcastClient, error) } type hedgeClient struct { @@ -29,20 +30,74 @@ func NewHedgeClient(cc grpc.ClientConnInterface) HedgeClient { return &hedgeClient{cc} } -func (c *hedgeClient) Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) { - out := new(HelloResponse) - err := c.cc.Invoke(ctx, "/hedge.proto.v1.Hedge/Hello", in, out, opts...) +func (c *hedgeClient) Send(ctx context.Context, opts ...grpc.CallOption) (Hedge_SendClient, error) { + stream, err := c.cc.NewStream(ctx, &Hedge_ServiceDesc.Streams[0], "/hedge.proto.v1.Hedge/Send", opts...) if err != nil { return nil, err } - return out, nil + x := &hedgeSendClient{stream} + return x, nil +} + +type Hedge_SendClient interface { + Send(*Payload) error + Recv() (*Payload, error) + grpc.ClientStream +} + +type hedgeSendClient struct { + grpc.ClientStream +} + +func (x *hedgeSendClient) Send(m *Payload) error { + return x.ClientStream.SendMsg(m) +} + +func (x *hedgeSendClient) Recv() (*Payload, error) { + m := new(Payload) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *hedgeClient) Broadcast(ctx context.Context, opts ...grpc.CallOption) (Hedge_BroadcastClient, error) { + stream, err := c.cc.NewStream(ctx, &Hedge_ServiceDesc.Streams[1], "/hedge.proto.v1.Hedge/Broadcast", opts...) + if err != nil { + return nil, err + } + x := &hedgeBroadcastClient{stream} + return x, nil +} + +type Hedge_BroadcastClient interface { + Send(*Payload) error + Recv() (*Payload, error) + grpc.ClientStream +} + +type hedgeBroadcastClient struct { + grpc.ClientStream +} + +func (x *hedgeBroadcastClient) Send(m *Payload) error { + return x.ClientStream.SendMsg(m) +} + +func (x *hedgeBroadcastClient) Recv() (*Payload, error) { + m := new(Payload) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // HedgeServer is the server API for Hedge service. // All implementations must embed UnimplementedHedgeServer // for forward compatibility type HedgeServer interface { - Hello(context.Context, *HelloRequest) (*HelloResponse, error) + Send(Hedge_SendServer) error + Broadcast(Hedge_BroadcastServer) error mustEmbedUnimplementedHedgeServer() } @@ -50,8 +105,11 @@ type HedgeServer interface { type UnimplementedHedgeServer struct { } -func (UnimplementedHedgeServer) Hello(context.Context, *HelloRequest) (*HelloResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented") +func (UnimplementedHedgeServer) Send(Hedge_SendServer) error { + return status.Errorf(codes.Unimplemented, "method Send not implemented") +} +func (UnimplementedHedgeServer) Broadcast(Hedge_BroadcastServer) error { + return status.Errorf(codes.Unimplemented, "method Broadcast not implemented") } func (UnimplementedHedgeServer) mustEmbedUnimplementedHedgeServer() {} @@ -66,22 +124,56 @@ func RegisterHedgeServer(s grpc.ServiceRegistrar, srv HedgeServer) { s.RegisterService(&Hedge_ServiceDesc, srv) } -func _Hedge_Hello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(HelloRequest) - if err := dec(in); err != nil { +func _Hedge_Send_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HedgeServer).Send(&hedgeSendServer{stream}) +} + +type Hedge_SendServer interface { + Send(*Payload) error + Recv() (*Payload, error) + grpc.ServerStream +} + +type hedgeSendServer struct { + grpc.ServerStream +} + +func (x *hedgeSendServer) Send(m *Payload) error { + return x.ServerStream.SendMsg(m) +} + +func (x *hedgeSendServer) Recv() (*Payload, error) { + m := new(Payload) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(HedgeServer).Hello(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/hedge.proto.v1.Hedge/Hello", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HedgeServer).Hello(ctx, req.(*HelloRequest)) + return m, nil +} + +func _Hedge_Broadcast_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HedgeServer).Broadcast(&hedgeBroadcastServer{stream}) +} + +type Hedge_BroadcastServer interface { + Send(*Payload) error + Recv() (*Payload, error) + grpc.ServerStream +} + +type hedgeBroadcastServer struct { + grpc.ServerStream +} + +func (x *hedgeBroadcastServer) Send(m *Payload) error { + return x.ServerStream.SendMsg(m) +} + +func (x *hedgeBroadcastServer) Recv() (*Payload, error) { + m := new(Payload) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err } - return interceptor(ctx, in, info, handler) + return m, nil } // Hedge_ServiceDesc is the grpc.ServiceDesc for Hedge service. @@ -90,12 +182,20 @@ func _Hedge_Hello_Handler(srv interface{}, ctx context.Context, dec func(interfa var Hedge_ServiceDesc = grpc.ServiceDesc{ ServiceName: "hedge.proto.v1.Hedge", HandlerType: (*HedgeServer)(nil), - Methods: []grpc.MethodDesc{ + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Send", + Handler: _Hedge_Send_Handler, + ServerStreams: true, + ClientStreams: true, + }, { - MethodName: "Hello", - Handler: _Hedge_Hello_Handler, + StreamName: "Broadcast", + Handler: _Hedge_Broadcast_Handler, + ServerStreams: true, + ClientStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "default.proto", }