Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit 4cb9a5f

Browse files
authored
Merge pull request #41 from triggermesh/metrics
Prometheus metrics exporter
2 parents d0b6553 + 9aeb964 commit 4cb9a5f

File tree

7 files changed

+809
-34
lines changed

7 files changed

+809
-34
lines changed

go.mod

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,16 @@ module github.com/triggermesh/aws-custom-runtime
33
go 1.15
44

55
require (
6-
github.com/cloudevents/sdk-go/v2 v2.6.1
7-
github.com/google/uuid v1.2.0
6+
contrib.go.opencensus.io/exporter/prometheus v0.4.1
7+
github.com/cloudevents/sdk-go/v2 v2.8.0
8+
github.com/google/go-cmp v0.5.6 // indirect
9+
github.com/google/uuid v1.3.0
810
github.com/kelseyhightower/envconfig v1.4.0
11+
go.opencensus.io v0.23.0
12+
go.uber.org/atomic v1.9.0 // indirect
13+
go.uber.org/zap v1.19.1 // indirect
14+
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
15+
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 // indirect
16+
golang.org/x/tools v0.1.8 // indirect
17+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
918
)

go.sum

Lines changed: 504 additions & 16 deletions
Large diffs are not rendered by default.

main.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/kelseyhightower/envconfig"
3232

3333
"github.com/triggermesh/aws-custom-runtime/pkg/converter"
34+
"github.com/triggermesh/aws-custom-runtime/pkg/metrics"
3435
"github.com/triggermesh/aws-custom-runtime/pkg/sender"
3536
)
3637

@@ -73,8 +74,9 @@ type Specification struct {
7374
}
7475

7576
type Handler struct {
76-
Sender *sender.Sender
77-
Converter converter.Converter
77+
sender *sender.Sender
78+
converter converter.Converter
79+
reporter *metrics.EventProcessingStatsReporter
7880

7981
requestSizeLimit int64
8082
functionTTL int64
@@ -102,28 +104,41 @@ func setupEnv(internalAPIport string) error {
102104
}
103105

104106
func (h *Handler) serve(w http.ResponseWriter, r *http.Request) {
107+
eventTypeTag, eventSrcTag := metrics.DefaultRequestType, metrics.DefaultRequestSource
108+
start := time.Now()
109+
defer func() {
110+
h.reporter.ReportProcessingLatency(time.Since(start), eventTypeTag, eventSrcTag)
111+
}()
112+
105113
requestSizeLimitInBytes := h.requestSizeLimit * 1e+6
106114
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, requestSizeLimitInBytes))
107115
if err != nil {
116+
h.reporter.ReportProcessingError(false, eventTypeTag, eventSrcTag)
108117
http.Error(w, err.Error(), http.StatusRequestEntityTooLarge)
109118
return
110119
}
111120
defer r.Body.Close()
112121

113-
req, context, err := h.Converter.Request(body, r.Header)
122+
req, context, err := h.converter.Request(body, r.Header)
114123
if err != nil {
124+
h.reporter.ReportProcessingError(false, eventTypeTag, eventSrcTag)
115125
http.Error(w, err.Error(), http.StatusInternalServerError)
116126
return
117127
}
118128

129+
eventTypeTag, eventSrcTag = metrics.CETagsFromContext(context)
130+
119131
result := enqueue(req, context, h.functionTTL*1e+9)
120-
result.data, err = h.Converter.Response(result.data)
132+
result.data, err = h.converter.Response(result.data)
121133
if err != nil {
122134
result.data = []byte(fmt.Sprintf("Response conversion error: %v", err))
123135
}
124-
if err := h.Sender.Send(result.data, result.statusCode, w); err != nil {
136+
if err := h.sender.Send(result.data, result.statusCode, w); err != nil {
137+
h.reporter.ReportProcessingError(false, eventTypeTag, eventSrcTag)
125138
log.Printf("! %s %s %v\n", result.id, result.data, err)
139+
return
126140
}
141+
h.reporter.ReportProcessingSuccess(eventTypeTag, eventSrcTag)
127142
}
128143

129144
func enqueue(request []byte, context map[string]string, ttl int64) message {
@@ -134,7 +149,7 @@ func enqueue(request []byte, context map[string]string, ttl int64) message {
134149
data: request,
135150
context: context,
136151
}
137-
log.Printf("<- %s %s\n", task.id, task.data)
152+
log.Printf("<- %s\n", task.id)
138153

139154
resultsChannel := make(chan message)
140155
mutex.Lock()
@@ -158,7 +173,7 @@ func enqueue(request []byte, context map[string]string, ttl int64) message {
158173
mutex.Lock()
159174
delete(results, task.id)
160175
mutex.Unlock()
161-
log.Printf("-> %s %d %s\n", resp.id, resp.statusCode, resp.data)
176+
log.Printf("-> %s %d\n", resp.id, resp.statusCode)
162177
return resp
163178
}
164179

@@ -283,10 +298,17 @@ func main() {
283298
log.Fatalf("Cannot create converter: %v", err)
284299
}
285300

301+
// start metrics reporter
302+
mr, err := metrics.StatsExporter()
303+
if err != nil {
304+
log.Fatalf("Cannot start stats exporter: %v", err)
305+
}
306+
286307
// setup sender
287308
handler := Handler{
288-
Sender: sender.New(spec.Sink, conv.ContentType()),
289-
Converter: conv,
309+
sender: sender.New(spec.Sink, conv.ContentType()),
310+
converter: conv,
311+
reporter: mr,
290312
requestSizeLimit: spec.RequestSizeLimit,
291313
functionTTL: spec.FunctionTTL,
292314
}

main_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/kelseyhightower/envconfig"
1818
"github.com/triggermesh/aws-custom-runtime/pkg/converter"
19+
"github.com/triggermesh/aws-custom-runtime/pkg/metrics"
1920
"github.com/triggermesh/aws-custom-runtime/pkg/sender"
2021
)
2122

@@ -49,9 +50,16 @@ func TestNewTask(t *testing.T) {
4950
log.Fatalf("Cannot create converter: %v", err)
5051
}
5152

53+
// start metrics reporter
54+
mr, err := metrics.StatsExporter()
55+
if err != nil {
56+
log.Fatalf("Cannot start stats exporter: %v", err)
57+
}
58+
5259
handler := Handler{
53-
Sender: sender.New(s.Sink, conv.ContentType()),
54-
Converter: conv,
60+
sender: sender.New(s.Sink, conv.ContentType()),
61+
converter: conv,
62+
reporter: mr,
5563
requestSizeLimit: s.RequestSizeLimit,
5664
functionTTL: s.FunctionTTL,
5765
}

pkg/converter/cloudevents/cloudevents.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ import (
2929
"github.com/kelseyhightower/envconfig"
3030
)
3131

32-
const contentType = "application/cloudevents+json"
32+
// CloudEvents request constant attributes.
33+
const (
34+
ContentType = "application/cloudevents+json"
35+
ContextKey = "Lambda-Runtime-Cloudevents-Context"
36+
)
3337

3438
// CloudEvent is a data structure required to map KLR responses to cloudevents
3539
type CloudEvent struct {
@@ -141,7 +145,7 @@ func (ce *CloudEvent) Request(request []byte, headers http.Header) ([]byte, map[
141145
}
142146

143147
runtimeContext := map[string]string{
144-
"Lambda-Runtime-Cloudevents-Context": string(ceContext),
148+
ContextKey: string(ceContext),
145149
}
146150

147151
return body, runtimeContext, nil
@@ -170,13 +174,14 @@ func parseStructuredCE(body []byte) ([]byte, map[string]string, error) {
170174
func parseBinaryCE(headers http.Header) map[string]string {
171175
h := make(map[string]string)
172176
for k, v := range headers {
173-
if strings.HasPrefix(k, "Ce-") {
174-
h[strings.ToLower(k[3:])] = strings.Join(v, ",")
177+
k = strings.ToLower(k)
178+
if strings.HasPrefix(k, "ce-") {
179+
h[k[3:]] = strings.Join(v, ",")
175180
}
176181
}
177182
return h
178183
}
179184

180185
func (ce *CloudEvent) ContentType() string {
181-
return contentType
186+
return ContentType
182187
}

pkg/metrics/cloudevents.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
Copyright 2022 Triggermesh Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"encoding/json"
21+
22+
"github.com/triggermesh/aws-custom-runtime/pkg/converter/cloudevents"
23+
"go.opencensus.io/tag"
24+
)
25+
26+
const (
27+
typeAttr = "type"
28+
sourceAttr = "source"
29+
)
30+
31+
// Default metric tags for raw requests.
32+
var (
33+
DefaultRequestType = tag.Insert(tagKeyEventType, "plain-http")
34+
DefaultRequestSource = tag.Insert(tagKeyEventSource, "unknown")
35+
)
36+
37+
// CETagsFromContext parses Lambda context and returns CloudEvents-specific
38+
// type and source tags.
39+
func CETagsFromContext(context map[string]string) (tag.Mutator, tag.Mutator) {
40+
if context == nil {
41+
return DefaultRequestType, DefaultRequestSource
42+
}
43+
ceContext, exists := context[cloudevents.ContextKey]
44+
if !exists {
45+
return DefaultRequestType, DefaultRequestSource
46+
}
47+
var attributes map[string]string
48+
if err := json.Unmarshal([]byte(ceContext), &attributes); err != nil {
49+
return DefaultRequestType, DefaultRequestSource
50+
}
51+
return tag.Insert(tagKeyEventType, attributes[typeAttr]),
52+
tag.Insert(tagKeyEventSource, attributes[sourceAttr])
53+
}

0 commit comments

Comments
 (0)