Skip to content

Commit d08c60f

Browse files
authored
Support defining variables and methods and fix some issues (#52)
* add mux as http router. add support to path methods and variables. refine some features. fix some issues. add tests. close #51 Signed-off-by: Lize Cai <[email protected]> * update tests Signed-off-by: Lize Cai <[email protected]> * fix typo, rename functions, and add test for structured and binary cloud event Signed-off-by: Lize Cai <[email protected]>
1 parent 4e00aaf commit d08c60f

26 files changed

+796
-40
lines changed

.github/workflows/main.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ jobs:
5555
e2e: "test/declarative/sync-http-multiple/e2e.yaml"
5656
- name: Declarative multiple Sync Cloudevent e2e test
5757
e2e: "test/declarative/sync-cloudevent-multiple/e2e.yaml"
58+
- name: Declarative multiple functions with variables e2e test
59+
e2e: "test/declarative/sync-http-variables/e2e.yaml"
5860
steps:
5961
- uses: actions/checkout@v2
6062

context/context.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
cloudevents "github.com/cloudevents/sdk-go/v2"
1717
dapr "github.com/dapr/go-sdk/client"
1818
"github.com/dapr/go-sdk/service/common"
19+
"github.com/gorilla/mux"
1920
"k8s.io/klog/v2"
2021
agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
2122
)
@@ -938,6 +939,28 @@ func ConvertUserDataToBytes(data interface{}) []byte {
938939
}
939940
}
940941

942+
type contextKey int
943+
944+
const (
945+
varsKey contextKey = iota
946+
)
947+
948+
// Vars returns the route variables for the current request, if any.
949+
var (
950+
Vars = mux.Vars
951+
)
952+
953+
func CtxWithVars(ctx context.Context, vars map[string]string) context.Context {
954+
return context.WithValue(ctx, varsKey, vars)
955+
}
956+
957+
func VarsFromCtx(ctx context.Context) map[string]string {
958+
if rv := ctx.Value(varsKey); rv != nil {
959+
return rv.(map[string]string)
960+
}
961+
return nil
962+
}
963+
941964
func IsTracingProviderSkyWalking(ctx RuntimeContext) bool {
942965
if ctx.HasPluginsTracingCfg() && ctx.GetPluginsTracingCfg().IsEnabled() &&
943966
ctx.GetPluginsTracingCfg().ProviderName() == TracingProviderSkywalking {

context/context_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"os"
55
"strings"
66
"testing"
7+
"net/http"
8+
"reflect"
79
)
810

911
var (
@@ -292,3 +294,36 @@ func TestParseFunctionContext(t *testing.T) {
292294
t.Fatal("Error set function context env")
293295
}
294296
}
297+
298+
299+
func TestGetVarsFromContext(t *testing.T) {
300+
301+
tests := []struct {
302+
name string
303+
request *http.Request
304+
vars map[string]string
305+
}{
306+
{
307+
name: "single variable",
308+
request: &http.Request{},
309+
vars: map[string]string{"key1": "val1"},
310+
},
311+
{
312+
name: "multi variables",
313+
request: &http.Request{},
314+
vars: map[string]string{"key1": "val1", "key2": "val2"},
315+
},
316+
}
317+
for _, tt := range tests {
318+
t.Run(tt.name, func(t *testing.T) {
319+
r := tt.request
320+
ctx := r.Context()
321+
ctx = CtxWithVars(ctx, tt.vars)
322+
got := VarsFromCtx(ctx)
323+
if !reflect.DeepEqual(got, tt.vars) {
324+
t.Errorf("VarsFromCtx = %v, want %v", got, tt.vars)
325+
}
326+
})
327+
}
328+
329+
}

framework/framework.go

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package framework
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"net/http"
78
"os"
89

@@ -21,19 +22,21 @@ import (
2122
)
2223

2324
type functionsFrameworkImpl struct {
24-
funcContext ofctx.RuntimeContext
25-
prePlugins []plugin.Plugin
26-
postPlugins []plugin.Plugin
27-
pluginMap map[string]plugin.Plugin
28-
runtime runtime.Interface
29-
registry *registry.Registry
25+
funcContext ofctx.RuntimeContext
26+
funcContextMap map[string]ofctx.RuntimeContext
27+
prePlugins []plugin.Plugin
28+
postPlugins []plugin.Plugin
29+
pluginMap map[string]plugin.Plugin
30+
runtime runtime.Interface
31+
registry *registry.Registry
3032
}
3133

3234
// Framework is the interface for the function conversion.
3335
type Framework interface {
3436
Register(ctx context.Context, fn interface{}) error
3537
RegisterPlugins(customPlugins map[string]plugin.Plugin)
3638
Start(ctx context.Context) error
39+
TryRegisterFunctions(ctx context.Context) error
3740
GetRuntime() runtime.Interface
3841
}
3942

@@ -50,6 +53,8 @@ func NewFramework() (*functionsFrameworkImpl, error) {
5053
} else {
5154
fwk.funcContext = ctx
5255
}
56+
// for multi functions use cases
57+
fwk.funcContextMap = map[string]ofctx.RuntimeContext{}
5358

5459
// Scan the local directory and register the plugins if exist
5560
// Register the framework default plugins under `plugin` directory
@@ -100,7 +105,7 @@ func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{})
100105
return nil
101106
}
102107

103-
func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
108+
func (fwk *functionsFrameworkImpl) TryRegisterFunctions(ctx context.Context) error {
104109

105110
target := os.Getenv("FUNCTION_TARGET")
106111

@@ -110,14 +115,25 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
110115
klog.Infof("registering function: %s on path: %s", target, fn.GetPath())
111116
switch fn.GetFunctionType() {
112117
case functions.HTTPType:
113-
fwk.Register(ctx, fn.GetHTTPFunction())
118+
if err := fwk.Register(ctx, fn.GetHTTPFunction()); err != nil {
119+
klog.Errorf("failed to register function: %v", err)
120+
return err
121+
}
114122
case functions.CloudEventType:
115-
fwk.Register(ctx, fn.GetCloudEventFunction())
123+
if err := fwk.Register(ctx, fn.GetCloudEventFunction()); err != nil {
124+
klog.Errorf("failed to register function: %v", err)
125+
return err
126+
}
116127
case functions.OpenFunctionType:
117-
fwk.Register(ctx, fn.GetOpenFunctionFunction())
128+
if err := fwk.Register(ctx, fn.GetOpenFunctionFunction()); err != nil {
129+
klog.Errorf("failed to register function: %v", err)
130+
return err
131+
}
132+
default:
133+
return fmt.Errorf("Unkown function type: %s", fn.GetFunctionType())
118134
}
119135
} else {
120-
klog.Errorf("function not found: %s", target)
136+
return fmt.Errorf("function not found: %s", target)
121137
}
122138
} else {
123139
// if FUNCTION_TARGET is not provided but user uses declarative function, by default all registered functions will be deployed.
@@ -129,19 +145,26 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
129145
for _, name := range funcNames {
130146
if rf, ok := fwk.registry.GetRegisteredFunction(name); ok {
131147
klog.Infof("registering function: %s on path: %s", rf.GetName(), rf.GetPath())
148+
// Parse OpenFunction FunctionContext
149+
if ctx, err := ofctx.GetRuntimeContext(); err != nil {
150+
klog.Errorf("failed to parse OpenFunction FunctionContext: %v\n", err)
151+
return err
152+
} else {
153+
fwk.funcContextMap[rf.GetName()] = ctx
154+
}
132155
switch rf.GetFunctionType() {
133156
case functions.HTTPType:
134-
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
157+
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
135158
klog.Errorf("failed to register function: %v", err)
136159
return err
137160
}
138161
case functions.CloudEventType:
139-
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
162+
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
140163
klog.Errorf("failed to register function: %v", err)
141164
return err
142165
}
143166
case functions.OpenFunctionType:
144-
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
167+
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
145168
klog.Errorf("failed to register function: %v", err)
146169
return err
147170
}
@@ -150,8 +173,18 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
150173
}
151174
}
152175
}
176+
return nil
177+
}
178+
179+
func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
180+
181+
err := fwk.TryRegisterFunctions(ctx)
182+
if err != nil {
183+
klog.Error("failed to start registering functions")
184+
return err
185+
}
153186

154-
err := fwk.runtime.Start(ctx)
187+
err = fwk.runtime.Start(ctx)
155188
if err != nil {
156189
klog.Error("failed to start runtime service")
157190
return err

framework/framework_test.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/stretchr/testify/assert"
1919

2020
ofctx "github.com/OpenFunction/functions-framework-go/context"
21+
"github.com/OpenFunction/functions-framework-go/functions"
2122
"github.com/OpenFunction/functions-framework-go/runtime/async"
2223
)
2324

@@ -166,6 +167,157 @@ func TestCloudEventFunction(t *testing.T) {
166167
}
167168
}
168169

170+
func TestMultipleFunctions(t *testing.T) {
171+
env := `{
172+
"name": "function-demo",
173+
"version": "v1.0.0",
174+
"port": "8080",
175+
"runtime": "Knative",
176+
"httpPattern": "/"
177+
}`
178+
var ceDemo = struct {
179+
message map[string]string
180+
headers map[string]string
181+
}{
182+
message: map[string]string{
183+
"msg": "Hello World!",
184+
},
185+
headers: map[string]string{
186+
"Ce-Specversion": "1.0",
187+
"Ce-Type": "cloudevents.openfunction.samples.helloworld",
188+
"Ce-Source": "cloudevents.openfunction.samples/helloworldsource",
189+
"Ce-Id": "536808d3-88be-4077-9d7a-a3f162705f79",
190+
},
191+
}
192+
193+
ctx := context.Background()
194+
fwk, err := createFramework(env)
195+
if err != nil {
196+
t.Fatalf("failed to create framework: %v", err)
197+
}
198+
199+
fwk.RegisterPlugins(nil)
200+
201+
// register multiple functions
202+
functions.HTTP("http", fakeHTTPFunction,
203+
functions.WithFunctionPath("/http"),
204+
functions.WithFunctionMethods("GET"),
205+
)
206+
207+
functions.CloudEvent("ce", fakeCloudEventsFunction,
208+
functions.WithFunctionPath("/ce"),
209+
)
210+
211+
functions.OpenFunction("ofn", fakeBindingsFunction,
212+
functions.WithFunctionPath("/ofn"),
213+
functions.WithFunctionMethods("GET", "POST"),
214+
)
215+
216+
if err := fwk.TryRegisterFunctions(ctx); err != nil {
217+
t.Fatalf("failed to start registering functions: %v", err)
218+
}
219+
220+
if fwk.GetRuntime() == nil {
221+
t.Fatal("failed to create runtime")
222+
}
223+
handler := fwk.GetRuntime().GetHandler()
224+
if handler == nil {
225+
t.Fatal("handler is nil")
226+
}
227+
228+
srv := httptest.NewServer(handler.(http.Handler))
229+
defer srv.Close()
230+
231+
// test http
232+
t.Run("sending http", func(t *testing.T) {
233+
resp, err := http.Get(srv.URL + "/http")
234+
if err != nil {
235+
t.Fatalf("http.Get: %v", err)
236+
}
237+
238+
defer resp.Body.Close()
239+
body, err := ioutil.ReadAll(resp.Body)
240+
if err != nil {
241+
t.Fatalf("ioutil.ReadAll: %v", err)
242+
}
243+
244+
if got, want := string(body), "Hello World!"; got != want {
245+
t.Fatalf("TestHTTPFunction: got %v; want %v", got, want)
246+
}
247+
})
248+
249+
// test http to openfunction
250+
t.Run("sending http to openfunction", func(t *testing.T) {
251+
resp, err := http.Get(srv.URL + "/ofn")
252+
if err != nil {
253+
t.Fatalf("http.Get: %v", err)
254+
}
255+
256+
defer resp.Body.Close()
257+
body, err := ioutil.ReadAll(resp.Body)
258+
if err != nil {
259+
t.Fatalf("ioutil.ReadAll: %v", err)
260+
}
261+
262+
if got, want := string(body), "hello there"; got != want {
263+
t.Fatalf("TestHTTPFunction: got %v; want %v", got, want)
264+
}
265+
})
266+
267+
// test cloudevent
268+
t.Run("sending cloudevent", func(t *testing.T) {
269+
messageByte, err := json.Marshal(ceDemo.message)
270+
if err != nil {
271+
t.Fatalf("failed to marshal message: %v", err)
272+
}
273+
274+
req, err := http.NewRequest("POST", srv.URL+"/ce", bytes.NewBuffer(messageByte))
275+
if err != nil {
276+
t.Fatalf("error creating HTTP request for test: %v", err)
277+
}
278+
req.Header.Set("Content-Type", "application/json")
279+
for k, v := range ceDemo.headers {
280+
req.Header.Set(k, v)
281+
}
282+
client := &http.Client{}
283+
resp, err := client.Do(req)
284+
if err != nil {
285+
t.Fatalf("failed to do client.Do: %v", err)
286+
}
287+
288+
if resp.StatusCode != http.StatusOK {
289+
t.Fatalf("failed to test cloudevents function: response status = %v, want %v", resp.StatusCode, http.StatusOK)
290+
}
291+
})
292+
293+
// test cloudevent to openfunction
294+
t.Run("sending cloudevent to openfunction", func(t *testing.T) {
295+
messageByte, err := json.Marshal(ceDemo.message)
296+
if err != nil {
297+
t.Fatalf("failed to marshal message: %v", err)
298+
}
299+
300+
req, err := http.NewRequest("POST", srv.URL+"/ofn", bytes.NewBuffer(messageByte))
301+
if err != nil {
302+
t.Fatalf("error creating HTTP request for test: %v", err)
303+
}
304+
req.Header.Set("Content-Type", "application/json")
305+
for k, v := range ceDemo.headers {
306+
req.Header.Set(k, v)
307+
}
308+
client := &http.Client{}
309+
resp, err := client.Do(req)
310+
if err != nil {
311+
t.Fatalf("failed to do client.Do: %v", err)
312+
}
313+
314+
if resp.StatusCode != http.StatusOK {
315+
t.Fatalf("failed to test cloudevents function: response status = %v, want %v", resp.StatusCode, http.StatusOK)
316+
}
317+
})
318+
319+
}
320+
169321
func TestAsyncBindingsFunction(t *testing.T) {
170322
env := `{
171323
"name": "function-demo",

0 commit comments

Comments
 (0)