diff --git a/context/context.go b/context/context.go index 69da43c..2c2d7f1 100644 --- a/context/context.go +++ b/context/context.go @@ -17,7 +17,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" dapr "github.com/dapr/go-sdk/client" "github.com/dapr/go-sdk/service/common" - "github.com/gorilla/mux" + "github.com/go-chi/chi/v5" "k8s.io/klog/v2" agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" ) @@ -966,11 +966,9 @@ const ( varsKey contextKey = iota ) -// Vars returns the route variables for the current request, if any. -var ( - Vars = mux.Vars -) - +// CtxWithVars is just for backward compatibility of VarsFromCtx to the previous implementation using gorilla/mux +// CtxWithVars adds URL Parameters into Context and user can get them by VarsFromCtx +// However, the recommended way to read URL Parameter in go-chi/chi is using URLParamFromCtx func CtxWithVars(ctx context.Context, vars map[string]string) context.Context { return context.WithValue(ctx, varsKey, vars) } @@ -982,6 +980,24 @@ func VarsFromCtx(ctx context.Context) map[string]string { return nil } +// URLParamFromCtx returns the url parameter from a http.Request Context. +func URLParamFromCtx(ctx context.Context, key string) string { + return chi.URLParamFromCtx(ctx, key) +} + +// URLParamsFromCtx returns all the url parameters from a http.Request Context. +func URLParamsFromCtx(ctx context.Context) map[string]string { + res := map[string]string{} + if rctx := chi.RouteContext(ctx); rctx != nil { + for k := 0; k < len(rctx.URLParams.Keys); k++ { + key := rctx.URLParams.Keys[k] + val := rctx.URLParams.Values[k] + res[key] = val + } + } + return res +} + func IsTracingProviderSkyWalking(ctx RuntimeContext) bool { if ctx.HasPluginsTracingCfg() && ctx.GetPluginsTracingCfg().IsEnabled() && ctx.GetPluginsTracingCfg().ProviderName() == TracingProviderSkywalking { diff --git a/context/context_test.go b/context/context_test.go index a9dd56f..c114673 100644 --- a/context/context_test.go +++ b/context/context_test.go @@ -1,11 +1,11 @@ package context import ( + "net/http" "os" + "reflect" "strings" "testing" - "net/http" - "reflect" ) var ( @@ -108,8 +108,8 @@ var ( // TestParseFunctionContext tests and verifies the function that parses the function FunctionContext func TestParseFunctionContext(t *testing.T) { _, err := GetRuntimeContext() - if !strings.Contains(err.Error(), "env FUNC_CONTEXT not found") { - t.Fatal("Error parse function context") + if err != nil && !strings.Contains(err.Error(), "env FUNC_CONTEXT not found") { + t.Fatalf("Error parse function context: %s", err.Error()) } // test `podName`, `podNamespace` field @@ -295,7 +295,6 @@ func TestParseFunctionContext(t *testing.T) { } } - func TestGetVarsFromContext(t *testing.T) { tests := []struct { @@ -304,14 +303,14 @@ func TestGetVarsFromContext(t *testing.T) { vars map[string]string }{ { - name: "single variable", + name: "single variable", request: &http.Request{}, - vars: map[string]string{"key1": "val1"}, + vars: map[string]string{"key1": "val1"}, }, { - name: "multi variables", + name: "multi variables", request: &http.Request{}, - vars: map[string]string{"key1": "val1", "key2": "val2"}, + vars: map[string]string{"key1": "val1", "key2": "val2"}, }, } for _, tt := range tests { @@ -326,4 +325,4 @@ func TestGetVarsFromContext(t *testing.T) { }) } -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index 1c1a37d..2cb2dc4 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,9 @@ require ( github.com/dapr/dapr v1.8.3 github.com/dapr/go-sdk v1.5.0 github.com/fatih/structs v1.1.0 + github.com/go-chi/chi/v5 v5.0.8 github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 - github.com/gorilla/mux v1.8.0 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.4 google.golang.org/grpc v1.47.0 @@ -21,6 +21,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.2.3 // indirect + github.com/gorilla/mux v1.8.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index 9f6cf14..3e9b47e 100644 --- a/go.sum +++ b/go.sum @@ -646,8 +646,11 @@ github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U= github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= +github.com/go-chi/chi v4.0.2+incompatible h1:maB6vn6FqCxrpz4FqWdh4+lwpyZIQS7YEAUcHlgXVRs= github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs= +github.com/go-chi/chi/v5 v5.0.8 h1:lD+NLqFcAi1ovnVZpsnObHGW4xb4J8lNmoYVfECH1Y0= +github.com/go-chi/chi/v5 v5.0.8/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-co-op/gocron v1.9.0/go.mod h1:DbJm9kdgr1sEvWpHCA7dFFs/PGHPMil9/97EXCRPr4k= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-errors/errors v1.4.0/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= diff --git a/runtime/knative/knative.go b/runtime/knative/knative.go index f80baff..8351e8f 100644 --- a/runtime/knative/knative.go +++ b/runtime/knative/knative.go @@ -10,7 +10,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" - "github.com/gorilla/mux" + "github.com/go-chi/chi/v5" "k8s.io/klog/v2" ofctx "github.com/OpenFunction/functions-framework-go/context" @@ -30,7 +30,7 @@ const ( type Runtime struct { port string pattern string - handler *mux.Router + handler *chi.Mux } func NewKnativeRuntime(port string, pattern string) *Runtime { @@ -40,7 +40,7 @@ func NewKnativeRuntime(port string, pattern string) *Runtime { return &Runtime{ port: port, pattern: pattern, - handler: mux.NewRouter(), + handler: chi.NewRouter(), } } @@ -61,11 +61,10 @@ func (r *Runtime) RegisterOpenFunction( ctx.InitDaprClientIfNil() } - // Register the synchronous function (based on Knaitve runtime) - route := r.handler.HandleFunc(rf.GetPath(), func(w http.ResponseWriter, r *http.Request) { + fn := func(w http.ResponseWriter, r *http.Request) { rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins) // save the Vars into the context - _ctx := ofctx.CtxWithVars(r.Context(), ofctx.Vars(r)) + _ctx := ofctx.CtxWithVars(r.Context(), ofctx.URLParamsFromCtx(r.Context())) rm.FuncContext.SetNativeContext(_ctx) rm.FuncContext.SetSyncRequest(w, r.WithContext(_ctx)) defer RecoverPanicHTTP(w, "Function panic") @@ -84,12 +83,17 @@ func (r *Runtime) RegisterOpenFunction( default: return } - }) + } - // add methods matcher if provided methods := rf.GetFunctionMethods() + // Register the synchronous function (based on Knaitve runtime) if len(methods) > 0 { - route.Methods(methods...) + // add methods matcher if provided + for _, method := range methods { + r.handler.MethodFunc(method, rf.GetPath(), fn) + } + } else { + r.handler.HandleFunc(rf.GetPath(), fn) } return nil @@ -101,20 +105,24 @@ func (r *Runtime) RegisterHTTPFunction( postPlugins []plugin.Plugin, rf *functions.RegisteredFunction, ) error { - route := r.handler.HandleFunc(rf.GetPath(), func(w http.ResponseWriter, r *http.Request) { + fn := func(w http.ResponseWriter, r *http.Request) { rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins) // save the Vars into the context - _ctx := ofctx.CtxWithVars(r.Context(), ofctx.Vars(r)) + _ctx := ofctx.CtxWithVars(r.Context(), ofctx.URLParamsFromCtx(r.Context())) rm.FuncContext.SetNativeContext(_ctx) rm.FuncContext.SetSyncRequest(w, r.WithContext(_ctx)) defer RecoverPanicHTTP(w, "Function panic") rm.FunctionRunWrapperWithHooks(rf.GetHTTPFunction()) - }) + } - // add methods matcher if any methods := rf.GetFunctionMethods() if len(methods) > 0 { - route.Methods(methods...) + // add methods matcher if provided + for _, method := range methods { + r.handler.MethodFunc(method, rf.GetPath(), fn) + } + } else { + r.handler.HandleFunc(rf.GetPath(), fn) } return nil @@ -150,8 +158,8 @@ func (r *Runtime) RegisterCloudEventFunction( // function to extract Vars and add into ctx withVars := func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := ofctx.CtxWithVars(r.Context(), ofctx.Vars(r)) - next.ServeHTTP(w, r.WithContext(ctx)) + _ctx := ofctx.CtxWithVars(r.Context(), ofctx.URLParamsFromCtx(r.Context())) + next.ServeHTTP(w, r.WithContext(_ctx)) }) } r.handler.Handle(rf.GetPath(), withVars(handleFn)) diff --git a/test/bindings/e2e.yaml b/test/bindings/e2e.yaml index b316247..db7c75b 100644 --- a/test/bindings/e2e.yaml +++ b/test/bindings/e2e.yaml @@ -19,7 +19,7 @@ setup: - name: install kafka operator command: | helm repo add strimzi https://strimzi.io/charts/ - helm install kafka-operator -n default strimzi/strimzi-kafka-operator + helm install kafka-operator -n default strimzi/strimzi-kafka-operator --version 0.35.0 - name: install kafka path: ../kafka.yaml @@ -34,6 +34,14 @@ setup: - name: setup manifests path: manifests.yaml wait: + - namespace: default + resource: deployments + label-selector: app=bindings-sender + for: condition=Available + - namespace: default + resource: deployments + label-selector: app=bindings-target + for: condition=Available - namespace: default resource: pod label-selector: app=bindings-sender diff --git a/test/declarative/sync-http-variables/README.md b/test/declarative/sync-http-variables/README.md new file mode 100644 index 0000000..674ed87 --- /dev/null +++ b/test/declarative/sync-http-variables/README.md @@ -0,0 +1,72 @@ +# Sync-HTTP-Variables + +To run test cases locally + +## Run locally + +```sh +$ cd test/declarative/sync-http-variables +$ go run main.go http.go +``` + +## Send request + +### HTTP + +```sh +$ curl -X POST "http://localhost:8080/hello/openfunction?key1=value1" -d 'hello' + +{"hello":"openfunction"}% +``` + +### CloudEvent + +```sh +# binary +$ curl "http://localhost:8080/foo/openfunction" \ + -H "Ce-Specversion: 1.0" \ + -H "Ce-Type: io.openfunction.samples.helloworld" \ + -H "Ce-Source: io.openfunction.samples/helloworldsource" \ + -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \ + -H "Content-Type: application/json" \ + -d '{"data":"hello"}' + +I0605 01:38:57.481196 81001 http.go:87] cloudevent - Data: {"hello":"openfunction"} +I0605 01:38:57.481310 81001 plugin-example.go:88] plugin - Result: {"sum":2} + +# structured +$ curl "http://localhost:8080/foo/openfunction" \ + -H "Content-Type: application/cloudevents+json" \ + -d '{"specversion":"1.0","type":"io.openfunction.samples.helloworld","source":"io.openfunction.samples/helloworldsource","id":"536808d3-88be-4077-9d7a-a3f162705f79","data":{"data":"hello"}}' + +I0605 01:46:52.336317 81001 http.go:87] cloudevent - Data: {"hello":"openfunction"} +I0605 01:46:52.336342 81001 plugin-example.go:88] plugin - Result: {"sum":2} +``` + +### OpenFunction + +```sh +# HTTP +$ curl -X GET "http://localhost:8080/bar/openfunction?key1=value1" -d '{"data":"hello"}' + +{"hello":"openfunction"}% + +# CloudEvent +## binary +$ curl "http://localhost:8080/bar/openfunction" \ + -H "Ce-Specversion: 1.0" \ + -H "Ce-Type: io.openfunction.samples.helloworld" \ + -H "Ce-Source: io.openfunction.samples/helloworldsource" \ + -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" \ + -H "Content-Type: application/json" \ + -d '{"data":"hello"}' + +{"hello":"openfunction"} + +## structured +$ curl "http://localhost:8080/bar/openfunction" \ + -H "Content-Type: application/cloudevents+json" \ + -d '{"specversion":"1.0","type":"io.openfunction.samples.helloworld","source":"io.openfunction.samples/helloworldsource","id":"536808d3-88be-4077-9d7a-a3f162705f79","data":{"data":"hello"}}' + +{"hello":"openfunction"}% +``` \ No newline at end of file diff --git a/test/declarative/sync-http-variables/http.go b/test/declarative/sync-http-variables/http.go index d706bf2..78c2313 100644 --- a/test/declarative/sync-http-variables/http.go +++ b/test/declarative/sync-http-variables/http.go @@ -46,9 +46,9 @@ type Message struct { } func hello(w http.ResponseWriter, r *http.Request) { - vars := ofctx.VarsFromCtx(r.Context()) + name := ofctx.URLParamFromCtx(r.Context(), "name") response := map[string]string{ - "hello": vars["name"], + "hello": name, } responseBytes, _ := json.Marshal(response) w.Header().Set("Content-type", "application/json") @@ -56,6 +56,8 @@ func hello(w http.ResponseWriter, r *http.Request) { } func hellov2(w http.ResponseWriter, r *http.Request) { + // keep for backward compatibility, same for example below + // suggest to use ofctx.URLParamFromCtx(...) to get vars vars := ofctx.VarsFromCtx(r.Context()) response := map[string]string{ "hello": vars["name"], @@ -66,16 +68,15 @@ func hellov2(w http.ResponseWriter, r *http.Request) { } func foo(ctx context.Context, ce cloudevents.Event) error { - vars := ofctx.VarsFromCtx(ctx) - msg := &Message{} err := json.Unmarshal(ce.Data(), msg) if err != nil { return err } + name := ofctx.URLParamFromCtx(ctx, "name") response := map[string]string{ - msg.Data: vars["name"], + msg.Data: name, } responseBytes, _ := json.Marshal(response) klog.Infof("cloudevent - Data: %s", string(responseBytes)) @@ -100,15 +101,15 @@ func foov2(ctx context.Context, ce cloudevents.Event) error { } func bar(ctx ofctx.Context, in []byte) (ofctx.Out, error) { - vars := ofctx.VarsFromCtx(ctx.GetNativeContext()) msg := &Message{} err := json.Unmarshal(in, msg) if err != nil { return ctx.ReturnOnInternalError(), err } + name := ofctx.URLParamFromCtx(ctx.GetNativeContext(), "name") response := map[string]string{ - msg.Data: vars["name"], + msg.Data: name, } responseBytes, _ := json.Marshal(response) return ctx.ReturnOnSuccess().WithData(responseBytes), nil diff --git a/test/declarative/sync-http-variables/verify-cloudevent-structured.sh b/test/declarative/sync-http-variables/verify-cloudevent-structured.sh index 8d5bc72..2935569 100644 --- a/test/declarative/sync-http-variables/verify-cloudevent-structured.sh +++ b/test/declarative/sync-http-variables/verify-cloudevent-structured.sh @@ -2,7 +2,7 @@ url=http://$1 while true; do - st=$(curl -s -o /dev/null -w "%{http_code}" "$url" -H "Content-Type: application/cloudevents+json" -d '{"specversion":"1.0","type":"dev.knative.samples.helloworld","source":"dev.knative.samples/helloworldsource","id":"536808d3-88be-4077-9d7a-a3f162705f79","data":{"data":"hello"}}') + st=$(curl -s -o /dev/null -w "%{http_code}" "$url" -H "Content-Type: application/cloudevents+json" -d '{"specversion":"1.0","type":"io.openfunction.samples.helloworld","source":"io.openfunction.samples/helloworldsource","id":"536808d3-88be-4077-9d7a-a3f162705f79","data":{"data":"hello"}}') if [ "$st" -eq 200 ]; then data_result=$(KUBECONFIG=/tmp/e2e-k8s.config kubectl logs --tail=2 -l app="sync-http-variables" -c http | grep Data | awk '{ print $8 }' | yq -P '.' -) plugin_result=$(KUBECONFIG=/tmp/e2e-k8s.config kubectl logs --tail=2 -l app="sync-http-variables" -c http | grep plugin | awk '{ print $8 }' | yq -P '.' -) diff --git a/test/kafka.yaml b/test/kafka.yaml index 8143206..ebc2054 100644 --- a/test/kafka.yaml +++ b/test/kafka.yaml @@ -5,7 +5,7 @@ metadata: namespace: default spec: kafka: - version: 3.1.0 + version: 3.4.0 replicas: 1 listeners: - name: plain @@ -22,7 +22,7 @@ spec: transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 - inter.broker.protocol.version: "3.1" + inter.broker.protocol.version: "3.4" storage: type: ephemeral zookeeper: diff --git a/test/pubsub/e2e.yaml b/test/pubsub/e2e.yaml index 5854f4e..64291cf 100644 --- a/test/pubsub/e2e.yaml +++ b/test/pubsub/e2e.yaml @@ -19,7 +19,7 @@ setup: - name: install kafka operator command: | helm repo add strimzi https://strimzi.io/charts/ - helm install kafka-operator -n default strimzi/strimzi-kafka-operator + helm install kafka-operator -n default strimzi/strimzi-kafka-operator --version 0.35.0 - name: install kafka path: ../kafka.yaml @@ -34,6 +34,14 @@ setup: - name: setup manifests path: manifests.yaml wait: + - namespace: default + resource: deployments + label-selector: app=pubsub-subscriber + for: condition=Available + - namespace: default + resource: deployments + label-selector: app=pubsub-publisher + for: condition=Available - namespace: default resource: pod label-selector: app=pubsub-subscriber diff --git a/test/sync-http/e2e.yaml b/test/sync-http/e2e.yaml index 4a6bf50..63b8054 100644 --- a/test/sync-http/e2e.yaml +++ b/test/sync-http/e2e.yaml @@ -10,6 +10,10 @@ setup: - name: setup manifests path: manifests.yaml wait: + - namespace: default + resource: deployments + label-selector: app=sync-http + for: condition=Available - namespace: default resource: pod label-selector: app=sync-http