-
Notifications
You must be signed in to change notification settings - Fork 127
/
source.go
139 lines (122 loc) 路 3.42 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package yomo
import (
"context"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/id"
"github.com/yomorun/yomo/pkg/trace"
"go.opentelemetry.io/otel/attribute"
)
// Source is responsible for sending data to yomo.
type Source interface {
// Close will close the connection to YoMo-Zipper.
Close() error
// Connect to YoMo-Zipper.
Connect() error
// Write the data to directed downstream.
Write(tag uint32, data []byte) error
// WriteWithTarget writes data to sfn instance with specified target.
WriteWithTarget(tag uint32, data []byte, target string) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
}
// YoMo-Source
type yomoSource struct {
name string
zipperAddr string
client *core.Client
}
var _ Source = &yomoSource{}
// NewSource create a yomo-source
func NewSource(name, zipperAddr string, opts ...SourceOption) Source {
clientOpts := make([]core.ClientOption, len(opts))
for k, v := range opts {
clientOpts[k] = core.ClientOption(v)
}
client := core.NewClient(name, zipperAddr, core.ClientTypeSource, clientOpts...)
client.Logger = client.Logger.With(
"component", core.ClientTypeSource.String(),
"source_id", client.ClientID(),
"source_name", client.Name(),
"zipper_addr", zipperAddr,
)
return &yomoSource{
name: name,
zipperAddr: zipperAddr,
client: client,
}
}
// Close will close the connection to YoMo-Zipper.
func (s *yomoSource) Close() error {
_ = s.client.Close()
trace.ShutdownTracerProvider()
s.client.Logger.Debug("the source is closed")
return nil
}
// Connect to YoMo-Zipper.
func (s *yomoSource) Connect() error {
return s.client.Connect(context.Background())
}
// Write writes data with specified tag.
func (s *yomoSource) Write(tag uint32, data []byte) error {
if err := frame.IsReservedTag(tag); err != nil {
return err
}
md := core.NewMetadata(s.client.ClientID(), id.New())
// add trace
tracer := trace.NewTracer("Source")
span := tracer.Start(md, s.name)
defer tracer.End(
md,
span,
attribute.Int("send_data_tag", int(tag)),
attribute.Int("send_data_len", len(data)),
)
mdBytes, err := md.Encode()
// metadata
if err != nil {
return err
}
f := &frame.DataFrame{
Tag: tag,
Metadata: mdBytes,
Payload: data,
}
s.client.Logger.Debug("source write", "tag", tag, "dataLen", len(data))
return s.client.WriteFrame(f)
}
// WritePayload writes `yomo.Payload` with specified tag.
func (s *yomoSource) WriteWithTarget(tag uint32, data []byte, target string) error {
if err := frame.IsReservedTag(tag); err != nil {
return err
}
md := core.NewMetadata(s.client.ClientID(), id.New())
// add trace
tracer := trace.NewTracer("Source")
span := tracer.Start(md, s.name)
defer tracer.End(
md,
span,
attribute.Int("send_data_tag", int(tag)),
attribute.String("send_data_target", target),
attribute.Int("send_data_len", len(data)),
)
if target != "" {
core.SetMetadataTarget(md, target)
}
mdBytes, err := md.Encode()
if err != nil {
return err
}
f := &frame.DataFrame{
Tag: tag,
Metadata: mdBytes,
Payload: data,
}
s.client.Logger.Debug("source write with target", "tag", tag, "data", data, "target", target)
return s.client.WriteFrame(f)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *yomoSource) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}