Skip to content

Commit 6640547

Browse files
authored
Merge pull request #5 from wrongerror/main
adjust buffer/retry mechanism
2 parents 0328157 + 4fec201 commit 6640547

File tree

6 files changed

+94
-104
lines changed

6 files changed

+94
-104
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
latest
1+
latest

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.18
44

55
require (
66
github.com/OpenFunction/functions-framework-go v0.5.0
7+
github.com/cenkalti/backoff/v4 v4.1.2
78
github.com/dapr/components-contrib v1.8.1-rc.1
89
github.com/dapr/dapr v1.8.3
910
github.com/dapr/go-sdk v1.5.0
@@ -25,7 +26,6 @@ require (
2526
github.com/andybalholm/brotli v1.0.4 // indirect
2627
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e // indirect
2728
github.com/beorn7/perks v1.0.1 // indirect
28-
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
2929
github.com/cespare/xxhash/v2 v2.1.2 // indirect
3030
github.com/cloudevents/sdk-go/v2 v2.4.1 // indirect
3131
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233 // indirect

main.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,9 @@ import (
88

99
ofctx "github.com/OpenFunction/functions-framework-go/context"
1010
"github.com/OpenFunction/functions-framework-go/framework"
11-
"github.com/cenkalti/backoff/v4"
1211
diag "github.com/dapr/dapr/pkg/diagnostics"
1312
"github.com/dapr/dapr/pkg/modes"
1413
"github.com/dapr/dapr/pkg/runtime"
15-
"github.com/pkg/errors"
1614
"k8s.io/klog/v2"
1715

1816
proxyruntime "github.com/OpenFunction/dapr-proxy/pkg/runtime"
@@ -77,36 +75,32 @@ func EventHandler(ctx ofctx.Context, in []byte) (ofctx.Out, error) {
7775
start := time.Now()
7876
defer func() {
7977
elapsed := diag.ElapsedSince(start)
78+
pendingEventsCount := FuncRuntime.GetPendingEventsCount()
79+
klog.V(4).Infof("Input: %s - Pending Events Count: %v", ctx.GetInputName(), pendingEventsCount)
8080
klog.V(4).Infof("Input: %s - Event Forwarding Elapsed: %vms", ctx.GetInputName(), elapsed)
8181
}()
8282

8383
c := ctx.GetNativeContext()
84+
respCh := make(chan *proxyruntime.EventResponse, 1)
8485

8586
// Handle BindingEvent
8687
bindingEvent := ctx.GetBindingEvent()
8788
if bindingEvent != nil {
88-
FuncRuntime.EnqueueBindingEvent(&c, bindingEvent)
89+
event := proxyruntime.NewEvent(&c, bindingEvent, nil, respCh)
90+
FuncRuntime.EnqueueEvent(&event)
8991
}
9092

9193
// Handle TopicEvent
9294
topicEvent := ctx.GetTopicEvent()
9395
if topicEvent != nil {
94-
FuncRuntime.EnqueueTopicEvent(&c, topicEvent)
96+
event := proxyruntime.NewEvent(&c, nil, topicEvent, respCh)
97+
FuncRuntime.EnqueueEvent(&event)
9598
}
9699

97-
var resp *proxyruntime.EventResponse
98-
err := backoff.Retry(func() error {
99-
resp = FuncRuntime.GetEventResponse(&c)
100-
if resp == nil {
101-
return errors.New("Failed to get event response")
102-
}
103-
return nil
104-
}, utils.NewExponentialBackOff())
105-
106-
if err != nil {
107-
e := errors.New("Processing event timeout")
108-
klog.Error(e)
109-
return ctx.ReturnOnInternalError(), e
100+
resp := <-respCh
101+
if resp.Error != nil {
102+
klog.Error(resp.Error)
103+
return ctx.ReturnOnInternalError(), resp.Error
110104
} else {
111105
out := new(ofctx.FunctionOut)
112106
out.WithData(resp.Data)

pkg/grpc/grpc.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,16 @@ func (g *Manager) StartEndpointsDetection() {
100100
}
101101
}
102102

103-
for ep, _ := range endpoints {
103+
for ep := range endpoints {
104104
conn, teardown, err := g.getGRPCConnection(context.TODO(), ep.String(), "", "", true, false, g.sslEnabled)
105105
teardown()
106106
state := conn.GetState()
107107
if err != nil {
108108
klog.Error(err)
109109
} else if state == connectivity.Ready || state == connectivity.Idle {
110110
g.balancer.Add(ep)
111+
} else {
112+
g.balancer.Remove(ep)
111113
}
112114
}
113115
time.Sleep(200 * time.Millisecond)
@@ -131,7 +133,9 @@ func (g *Manager) GetGRPCConnection() (*grpc.ClientConn, func(), error) {
131133
state := conn.GetState()
132134
if state != connectivity.Ready && state != connectivity.Idle {
133135
g.balancer.Remove(address)
134-
delete(g.connectionPool.pool, address.String())
136+
g.lock.Lock()
137+
defer g.lock.Unlock()
138+
g.connectionPool.Remove(address.String())
135139
teardown()
136140
}
137141
}, nil
@@ -257,6 +261,13 @@ func (p *connectionPool) Register(address string, conn *grpc.ClientConn) {
257261
p.referenceCount[conn] = 2
258262
}
259263

264+
func (p *connectionPool) Remove(address string) {
265+
if conn, ok := p.pool[address]; ok {
266+
delete(p.pool, address)
267+
conn.Close()
268+
}
269+
}
270+
260271
func (p *connectionPool) Share(address string) (*grpc.ClientConn, bool) {
261272
conn, ok := p.pool[address]
262273
if !ok {

pkg/runtime/runtime.go

Lines changed: 65 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"encoding/json"
77
nethttp "net/http"
8-
"sync"
98

109
ofctx "github.com/OpenFunction/functions-framework-go/context"
1110
"github.com/cenkalti/backoff/v4"
@@ -33,40 +32,43 @@ type Config struct {
3332
MaxBufferSize int
3433
}
3534

36-
type EventRequest struct {
37-
ctx *context.Context
38-
ofctx.EventRequest
35+
type Event struct {
36+
ctx *context.Context
37+
bindingEvent *common.BindingEvent
38+
topicEvent *common.TopicEvent
39+
respCh chan *EventResponse
40+
}
41+
42+
func NewEvent(ctx *context.Context,
43+
bindingEvent *common.BindingEvent,
44+
topicEvent *common.TopicEvent,
45+
respCh chan *EventResponse) Event {
46+
return Event{
47+
ctx: ctx,
48+
bindingEvent: bindingEvent,
49+
topicEvent: topicEvent,
50+
respCh: respCh,
51+
}
3952
}
4053

4154
type EventResponse struct {
4255
Data []byte
4356
Error error
4457
}
4558

46-
type ResponseMap struct {
47-
l *sync.RWMutex
48-
m map[*context.Context]*EventResponse
49-
}
50-
5159
type Runtime struct {
5260
config *Config
5361
ctx *ofctx.FunctionContext
5462
grpc *grpc.Manager
5563
funcChannel channel.AppChannel
56-
reqChan chan *EventRequest
57-
respMap *ResponseMap
64+
events chan *Event
5865
}
5966

6067
func NewFuncRuntime(config *Config, ctx *ofctx.FunctionContext) *Runtime {
61-
lock := new(sync.RWMutex)
6268
return &Runtime{
63-
config: config,
64-
ctx: ctx,
65-
reqChan: make(chan *EventRequest, config.MaxBufferSize),
66-
respMap: &ResponseMap{
67-
l: lock,
68-
m: make(map[*context.Context]*EventResponse),
69-
},
69+
config: config,
70+
ctx: ctx,
71+
events: make(chan *Event, config.MaxBufferSize),
7072
}
7173
}
7274

@@ -89,76 +91,59 @@ func (r *Runtime) CreateFuncChannel() error {
8991
return nil
9092
}
9193

92-
func (r *Runtime) ProcessEvents() {
93-
for e := range r.reqChan {
94-
if e.BindingEvent != nil {
95-
var data []byte
96-
// Retry on connection error.
97-
err := backoff.Retry(func() error {
98-
var err error
99-
data, err = r.OnBindingEvent(e.ctx, e.BindingEvent)
100-
if err != nil {
101-
klog.V(4).Info(err)
102-
return err
103-
}
104-
return nil
105-
}, utils.NewExponentialBackOff())
94+
func (r *Runtime) GetPendingEventsCount() int {
95+
return len(r.events)
96+
}
10697

107-
resp := EventResponse{
108-
Data: data,
109-
Error: err,
110-
}
111-
r.respMap.l.Lock()
112-
r.respMap.m[e.ctx] = &resp
113-
r.respMap.l.Unlock()
114-
}
98+
func (r *Runtime) ProcessEvents() {
99+
for e := range r.events {
100+
if e.bindingEvent != nil {
101+
go func() {
102+
var data []byte
103+
// Retry on connection error.
104+
err := backoff.Retry(func() error {
105+
var err error
106+
data, err = r.OnBindingEvent(e.ctx, e.bindingEvent)
107+
if err != nil {
108+
klog.V(4).Info(err)
109+
return err
110+
}
111+
return nil
112+
}, utils.NewExponentialBackOff())
115113

116-
if e.TopicEvent != nil {
117-
// Retry on connection error.
118-
err := backoff.Retry(func() error {
119-
var err error
120-
err = r.OnTopicEvent(e.ctx, e.TopicEvent)
121-
if err != nil {
122-
return err
114+
resp := EventResponse{
115+
Data: data,
116+
Error: err,
123117
}
124-
return nil
125-
}, utils.NewExponentialBackOff())
126-
127-
resp := EventResponse{
128-
Data: nil,
129-
Error: err,
130-
}
131-
r.respMap.l.Lock()
132-
r.respMap.m[e.ctx] = &resp
133-
r.respMap.l.Unlock()
118+
e.respCh <- &resp
119+
}()
134120
}
135-
}
136-
}
137121

138-
func (r *Runtime) EnqueueBindingEvent(ctx *context.Context, event *common.BindingEvent) {
139-
req := EventRequest{
140-
ctx: ctx,
141-
EventRequest: ofctx.EventRequest{BindingEvent: event},
142-
}
143-
r.reqChan <- &req
144-
}
122+
if e.topicEvent != nil {
123+
go func() {
124+
// Retry on connection error.
125+
err := backoff.Retry(func() error {
126+
var err error
127+
err = r.OnTopicEvent(e.ctx, e.topicEvent)
128+
if err != nil {
129+
klog.V(4).Info(err)
130+
return err
131+
}
132+
return nil
133+
}, utils.NewExponentialBackOff())
145134

146-
func (r *Runtime) EnqueueTopicEvent(ctx *context.Context, event *common.TopicEvent) {
147-
req := EventRequest{
148-
ctx: ctx,
149-
EventRequest: ofctx.EventRequest{TopicEvent: event},
135+
resp := EventResponse{
136+
Data: nil,
137+
Error: err,
138+
}
139+
e.respCh <- &resp
140+
}()
141+
}
150142
}
151-
r.reqChan <- &req
152143
}
153144

154-
func (r *Runtime) GetEventResponse(ctx *context.Context) *EventResponse {
155-
defer r.respMap.l.Unlock()
156-
r.respMap.l.Lock()
157-
if resp, ok := r.respMap.m[ctx]; ok {
158-
delete(r.respMap.m, ctx)
159-
return resp
160-
}
161-
return nil
145+
func (r *Runtime) EnqueueEvent(event *Event) {
146+
r.events <- event
162147
}
163148

164149
func (r *Runtime) OnBindingEvent(ctx *context.Context, event *common.BindingEvent) ([]byte, error) {

pkg/utils/utils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ func GetEnvVar(key, fallbackValue string) string {
6060
func NewExponentialBackOff() *backoff.ExponentialBackOff {
6161
b := &backoff.ExponentialBackOff{
6262
InitialInterval: 5 * time.Millisecond,
63-
RandomizationFactor: 0.5,
64-
Multiplier: 1.5,
65-
MaxInterval: 100 * time.Millisecond,
63+
RandomizationFactor: 0.2,
64+
Multiplier: 1,
65+
MaxInterval: 5 * time.Millisecond,
6666
MaxElapsedTime: 60 * time.Second,
6767
Stop: backoff.Stop,
6868
Clock: backoff.SystemClock,

0 commit comments

Comments
 (0)