Skip to content

Commit f0ece34

Browse files
authored
refactor!: JetStream source and sink connectors (#109)
1 parent f1fe352 commit f0ece34

File tree

3 files changed

+244
-114
lines changed

3 files changed

+244
-114
lines changed

examples/nats/main.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@ import (
1010
"github.com/nats-io/nats.go"
1111
"github.com/nats-io/stan.go"
1212
"github.com/reugn/go-streams/extension"
13-
ext "github.com/reugn/go-streams/nats"
14-
1513
"github.com/reugn/go-streams/flow"
14+
ext "github.com/reugn/go-streams/nats"
1615
)
1716

1817
func main() {
@@ -34,17 +33,75 @@ func jetStream() {
3433
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
3534
defer cancel()
3635

37-
fileSource := extension.NewFileSource("in.txt")
38-
toUpperMapFlow := flow.NewMap(toUpperString, 1)
39-
jetSink, err := ext.NewJetStreamSink("stream1", "stream1.subject1", "nats://localhost:4222")
36+
// connect to the NATS server
37+
nc, err := nats.Connect("nats://localhost:4222")
4038
if err != nil {
4139
log.Fatal(err)
4240
}
4341

44-
jetSource, err := ext.NewJetStreamSource(ctx, "stream1.subject1", "nats://localhost:4222")
42+
// create JetStreamContext
43+
js, err := nc.JetStream()
4544
if err != nil {
4645
log.Fatal(err)
4746
}
47+
48+
streamName := "stream1"
49+
subjectName := "stream1.subject1"
50+
51+
// check if the stream already exists; if not, create it
52+
stream, _ := js.StreamInfo(streamName)
53+
if stream == nil {
54+
// create stream
55+
// for the set of stream configuration options, see:
56+
// https://docs.nats.io/nats-concepts/jetstream/streams#configuration
57+
_, err = js.AddStream(&nats.StreamConfig{
58+
Name: streamName,
59+
Subjects: []string{subjectName},
60+
DiscardNewPerSubject: true, // exactly-once semantics
61+
MaxMsgsPerSubject: 1024,
62+
Discard: nats.DiscardNew,
63+
})
64+
if err != nil {
65+
log.Fatal(err)
66+
}
67+
log.Printf("Stream %s has been created", streamName)
68+
}
69+
70+
// create a new JetStream source connector
71+
sourceConfig := &ext.JetStreamSourceConfig{
72+
Conn: nc,
73+
JetStreamCtx: js,
74+
Subject: subjectName,
75+
ConsumerName: "JetStreamSource",
76+
FetchBatchSize: 64,
77+
Ack: true,
78+
SubOpts: []nats.SubOpt{
79+
nats.PullMaxWaiting(128),
80+
},
81+
PullOpts: []nats.PullOpt{
82+
nats.Context(ctx), // sets deadline for fetch
83+
},
84+
}
85+
jetSource, err := ext.NewJetStreamSource(ctx, sourceConfig)
86+
if err != nil {
87+
log.Fatal(err)
88+
}
89+
90+
fileSource := extension.NewFileSource("in.txt")
91+
toUpperMapFlow := flow.NewMap(toUpperString, 1)
92+
93+
// create a new JetStream sink connector
94+
sinkConfig := &ext.JetStreamSinkConfig{
95+
Conn: nc,
96+
JetStreamCtx: js,
97+
Subject: subjectName,
98+
PubOpts: []nats.PubOpt{nats.Context(ctx)},
99+
}
100+
jetSink, err := ext.NewJetStreamSink(sinkConfig)
101+
if err != nil {
102+
log.Fatal(err)
103+
}
104+
48105
fetchJetMsgMapFlow := flow.NewMap(fetchJetMsg, 1)
49106
stdOutSInk := extension.NewStdoutSink()
50107

0 commit comments

Comments
 (0)