Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service streaming example in Go #68

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/nbe/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/bruth/nats-by-example/cmd/nbe

go 1.18
go 1.19

require (
github.com/alecthomas/chroma v0.10.0
Expand Down
6 changes: 3 additions & 3 deletions docker/docker-compose.cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.9'
services:
nats1:
image: docker.io/nats:2.9.0
image: docker.io/nats:2.9.8
command:
- "--debug"
- "--name=nats1"
Expand All @@ -15,7 +15,7 @@ services:
- "18222:8222"

nats2:
image: docker.io/nats:2.9.0
image: docker.io/nats:2.9.8
command:
- "--debug"
- "--name=nats2"
Expand All @@ -29,7 +29,7 @@ services:
- "28222:8222"

nats3:
image: docker.io/nats:2.9.0
image: docker.io/nats:2.9.8
command:
- "--debug"
- "--name=nats3"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.9'
services:
nats:
image: docker.io/nats:2.9.0
image: docker.io/nats:2.9.8
command:
- "--debug"
- "--http_port=8222"
Expand Down
2 changes: 1 addition & 1 deletion docker/go/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.18-alpine AS build
FROM golang:1.19-alpine AS build

WORKDIR /opt/app

Expand Down
10 changes: 3 additions & 7 deletions docker/go/go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
module github.com/ConnectEverything/nats-by-example/go

go 1.18

require github.com/nats-io/nats.go v1.16.0
go 1.19

require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/nats-io/nats-server/v2 v2.8.4 // indirect
github.com/nats-io/nats.go v1.20.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
google.golang.org/protobuf v1.28.0 // indirect
golang.org/x/crypto v0.3.0 // indirect
)
24 changes: 4 additions & 20 deletions docker/go/go.sum
Original file line number Diff line number Diff line change
@@ -1,30 +1,14 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.20.0 h1:T8JJnQfVSdh1CzGiwAOv5hEobYCBho/0EupGznYw0oM=
github.com/nats-io/nats.go v1.20.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.3.0 h1:a06MkbcxBrEFc0w0QIZWXrH/9cCX6KJyWbBOIwAn+7A=
golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
85 changes: 85 additions & 0 deletions examples/messaging/service-streaming/go/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"fmt"
"math/rand"
"os"
"strconv"
"time"

"github.com/nats-io/nats.go"
)

func main() {
// Use the env varibale if running in the container, otherwise use the default.
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

// Create an unauthenticated connection to NATS.
nc, _ := nats.Connect(url)
defer nc.Drain()

// Setup a simple random number generator (RNG) service that
// streams numbers. Using a queue group ensures that only one
// of the members will receive the request to process.
nc.QueueSubscribe("rng", "rng", func(msg *nats.Msg) {
// Do request validation, etc. If there are any issues, a response
// can be sent with any error prior to the stream starting.
n, _ := strconv.ParseInt(string(msg.Data), 10, 64)

// Extract the unique subject to stream messages on.
subject := msg.Header.Get("x-stream-subject")

// Respond to the initial request with any errors. Otherwise an
// empty reply indicates the stream will start.
msg.Respond(nil)

// Stream the numbers to the client. A publish is used here,
// however, if ack'ing is desired, a request could be used
// per message.
for i := 0; i < int(n); i++ {
r := rand.Intn(100)
nc.Publish(subject, []byte(fmt.Sprintf("%d", r)))
}

// Publish empty data to indicate end-of-stream.
nc.Publish(subject, nil)
})

// Generate a unique inbox subject for the stream of messages
// and subscribe to it.
inbox := nc.NewInbox()
sub, _ := nc.SubscribeSync(inbox)

// Prepare the message to initiate the interaction. The inbox
// subject is included in the header for the service to extract
// and publish to.
msg := nats.NewMsg("rng")
msg.Header.Set("x-stream-subject", inbox)
msg.Data = []byte("10")

// Send the request to initiate the interaction.
nc.RequestMsg(msg, time.Second)

// Loop to receive all messages over the stream.
for {
msg, err := sub.NextMsg(time.Second)
// Handle error.
if err != nil {
sub.Unsubscribe()
// handle error
break
}

// Indicates the end of the stream.
if len(msg.Data) == 0 {
sub.Unsubscribe()
break
}

// Print the random number.
fmt.Println(string(msg.Data))
}
}
31 changes: 31 additions & 0 deletions examples/messaging/service-streaming/meta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
title: Service streaming pattern
description: |-
This messaging pattern builds upon the built-in [request-reply][req-rep] API.

Prior to the client sending the request, it must generate a unique subject
to be the dedicated *inbox* for receiving the stream of messages from the
service.

The simpliest way to achieve this is to use the client connection's
`NewInbox()` method which also ensures any connection-level inbox prefix
is already prepended.

This unique subject is then added as a header, such as `x-stream-subject`.
Prior to sending the request, the subscription on that subject must be setup
by the client so it can receive messages as soon as the client sends the
request.

When the service receives the request, it can do any request validation, etc.
and perform the standard reply indicating the streaming will be begin.
Now the service can freely publish messages (or send requests if acks are desired)
on the subject passed within the header.

To indicate the end-of-stream, the last message provides an empty message body
with an optional set of headers indicating any status, such as an error
had occurred mid-stream.

For those familiar with gRPC, this is analogous to the
[server streaming RPC][grpc].

[req-rep]: /examples/messaging/request-reply/go
[grpc]: https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc