-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathproducer.go
162 lines (137 loc) · 3.4 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package endpoint
import (
"context"
"errors"
"io"
"sync"
"github.com/usnistgov/ndn-dpdk/ndn"
"github.com/usnistgov/ndn-dpdk/ndn/an"
"github.com/usnistgov/ndn-dpdk/ndn/l3"
"go4.org/must"
)
// Error conditions.
var (
//lint:ignore ST1005 'Handler' is a field name
ErrNoHandler = errors.New("Handler is missing")
)
type producerNackError uint8
func (producerNackError) Error() string {
return "Nack"
}
// ReplyNack causes the producer to return a Nack packet.
func ReplyNack(reason uint8) error {
return producerNackError(reason)
}
// ProducerHandler is a producer handler function.
// - If it returns an error created with ReplyNack(), a Nack is sent in reply to the Interest.
// - If it returns a Data that satisfies the Interest, the Data is sent in reply to the Interest.
// - Otherwise, nothing is sent.
type ProducerHandler func(ctx context.Context, interest ndn.Interest) (ndn.Data, error)
// ProducerOptions contains arguments to Produce function.
type ProducerOptions struct {
// Prefix is the name prefix of the producer.
Prefix ndn.Name
// NoAdvertise disables prefix announcement.
// Default is announcing the prefix.
NoAdvertise bool
// Handler is a function to handle Interests under the prefix.
// This may be invoked concurrently.
Handler ProducerHandler
// Fw specifies the L3 Forwarder.
// Default is the default Forwarder.
Fw l3.Forwarder
// DataSigner automatically signs Data packets unless already signed.
// Default is keeping the Null signature.
DataSigner ndn.Signer
}
// Produce starts a producer.
func Produce(ctx context.Context, opts ProducerOptions) (Producer, error) {
if opts.Handler == nil {
return nil, ErrNoHandler
}
face, e := NewLFace(opts.Fw)
if e != nil {
return nil, e
}
face.FwFace.AddRoute(opts.Prefix)
if !opts.NoAdvertise {
face.FwFace.AddAnnouncement(opts.Prefix)
}
ctx1, cancel := context.WithCancel(ctx)
p := &producer{
ProducerOptions: opts,
face: face,
close: cancel,
}
go p.loop(ctx1)
return p, nil
}
// Producer represents a running producer.
type Producer interface {
io.Closer
}
type producer struct {
ProducerOptions
face *LFace
close context.CancelFunc
}
func (p *producer) Close() error {
p.close()
return nil
}
func (p *producer) loop(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(1)
defer func() {
wg.Wait()
must.Close(p.face)
p.close()
}()
L:
for {
select {
case <-ctx.Done():
wg.Done()
return
case l3pkt := <-p.face.Rx():
pkt := l3pkt.ToPacket()
if pkt.Interest == nil {
continue L
}
wg.Add(1)
go p.handleInterest(ctx, &wg, pkt)
}
}
}
func (p *producer) handleInterest(ctx context.Context, wg *sync.WaitGroup, pkt *ndn.Packet) {
defer wg.Done()
interest := pkt.Interest
if !p.Prefix.IsPrefixOf(interest.Name) {
return
}
ctx1, cancel1 := context.WithTimeout(ctx, interest.ApplyDefaultLifetime())
defer cancel1()
data, e := p.Handler(ctx1, *interest)
var reply *ndn.Packet
if e != nil {
if nackError, ok := e.(producerNackError); ok {
nack := ndn.MakeNack(interest, uint8(nackError))
reply = nack.ToPacket()
}
} else if data.CanSatisfy(*interest) {
if (data.SigInfo == nil || data.SigInfo.Type == an.SigNull) && p.DataSigner != nil {
if e := p.DataSigner.Sign(&data); e != nil {
return
}
}
reply = data.ToPacket()
reply.Lp = pkt.Lp
}
if reply == nil {
return
}
select {
case <-ctx.Done():
case p.face.Tx() <- reply:
}
}