From 798fc07f54e582f4f8fc3eb9b4aa27b08de380f4 Mon Sep 17 00:00:00 2001 From: Lize Date: Sun, 22 May 2022 11:22:27 +0800 Subject: [PATCH] feat: declarative function api (#48) * init declarative functions Signed-off-by: Lize Cai * add tests for declarative functions Signed-off-by: Lize Cai * update unit tests Signed-off-by: Lize Cai * update e2e files for declarative functions Signed-off-by: Lize Cai * add default http pattern for existing single function register method Signed-off-by: Lize Cai * fix declarative tests, update CMD in dockerfile Signed-off-by: Lize Cai --- .github/workflows/main.yml | 10 + .github/workflows/plugin_test.yaml | 2 + .gitignore | 4 +- context/context.go | 5 + framework/framework.go | 78 +++++++- functions/options.go | 11 ++ functions/registers.go | 37 ++++ internal/functions/functions.go | 174 ++++++++++++++++++ internal/functions/functions_test.go | 83 +++++++++ internal/registry/registry.go | 126 +++++++++++++ internal/registry/registry_test.go | 109 +++++++++++ runtime/async/async.go | 27 ++- runtime/knative/knative.go | 23 +-- runtime/runtime.go | 7 +- .../Dockerfile.sync.cloudevent | 12 ++ .../sync-cloudevent-multiple/cloudevent.go | 24 +++ .../sync-cloudevent-multiple/e2e.yaml | 48 +++++ .../expected.data.yaml | 2 + .../sync-cloudevent-multiple/main.go | 85 +++++++++ .../sync-cloudevent-multiple/manifests.yaml | 62 +++++++ .../sync-cloudevent-multiple/verify.sh | 17 ++ .../Dockerfile.sync.cloudevent | 12 ++ .../declarative/sync-cloudevent/cloudevent.go | 18 ++ test/declarative/sync-cloudevent/e2e.yaml | 46 +++++ .../sync-cloudevent/expected.data.yaml | 2 + test/declarative/sync-cloudevent/main.go | 85 +++++++++ .../sync-cloudevent/manifests.yaml | 62 +++++++ test/declarative/sync-cloudevent/verify.sh | 17 ++ .../sync-http-multiple/Dockerfile.sync.http | 12 ++ test/declarative/sync-http-multiple/e2e.yaml | 48 +++++ .../sync-http-multiple/expected.data-foo.yaml | 2 + .../expected.data-helloworld.yaml | 2 + test/declarative/sync-http-multiple/http.go | 31 ++++ test/declarative/sync-http-multiple/main.go | 84 +++++++++ .../sync-http-multiple/manifests.yaml | 62 +++++++ test/declarative/sync-http-multiple/verify.sh | 17 ++ .../sync-http/Dockerfile.sync.http | 12 ++ test/declarative/sync-http/e2e.yaml | 46 +++++ test/declarative/sync-http/expected.data.yaml | 2 + test/declarative/sync-http/http.go | 30 +++ test/declarative/sync-http/main.go | 84 +++++++++ test/declarative/sync-http/manifests.yaml | 62 +++++++ test/declarative/sync-http/verify.sh | 17 ++ 43 files changed, 1670 insertions(+), 29 deletions(-) create mode 100644 functions/options.go create mode 100644 functions/registers.go create mode 100644 internal/functions/functions.go create mode 100644 internal/functions/functions_test.go create mode 100644 internal/registry/registry.go create mode 100644 internal/registry/registry_test.go create mode 100644 test/declarative/sync-cloudevent-multiple/Dockerfile.sync.cloudevent create mode 100644 test/declarative/sync-cloudevent-multiple/cloudevent.go create mode 100644 test/declarative/sync-cloudevent-multiple/e2e.yaml create mode 100644 test/declarative/sync-cloudevent-multiple/expected.data.yaml create mode 100644 test/declarative/sync-cloudevent-multiple/main.go create mode 100644 test/declarative/sync-cloudevent-multiple/manifests.yaml create mode 100644 test/declarative/sync-cloudevent-multiple/verify.sh create mode 100644 test/declarative/sync-cloudevent/Dockerfile.sync.cloudevent create mode 100644 test/declarative/sync-cloudevent/cloudevent.go create mode 100644 test/declarative/sync-cloudevent/e2e.yaml create mode 100644 test/declarative/sync-cloudevent/expected.data.yaml create mode 100644 test/declarative/sync-cloudevent/main.go create mode 100644 test/declarative/sync-cloudevent/manifests.yaml create mode 100644 test/declarative/sync-cloudevent/verify.sh create mode 100644 test/declarative/sync-http-multiple/Dockerfile.sync.http create mode 100644 test/declarative/sync-http-multiple/e2e.yaml create mode 100644 test/declarative/sync-http-multiple/expected.data-foo.yaml create mode 100644 test/declarative/sync-http-multiple/expected.data-helloworld.yaml create mode 100644 test/declarative/sync-http-multiple/http.go create mode 100644 test/declarative/sync-http-multiple/main.go create mode 100644 test/declarative/sync-http-multiple/manifests.yaml create mode 100644 test/declarative/sync-http-multiple/verify.sh create mode 100644 test/declarative/sync-http/Dockerfile.sync.http create mode 100644 test/declarative/sync-http/e2e.yaml create mode 100644 test/declarative/sync-http/expected.data.yaml create mode 100644 test/declarative/sync-http/http.go create mode 100644 test/declarative/sync-http/main.go create mode 100644 test/declarative/sync-http/manifests.yaml create mode 100644 test/declarative/sync-http/verify.sh diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 81986cc..c555ef5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -8,6 +8,8 @@ on: - 'context/**' - 'plugin/**' - 'framework/**' + - 'functions/**' + - 'internal/**' - 'runtime/**' - 'test/**' - 'go.mod' @@ -45,6 +47,14 @@ jobs: e2e: "test/sync-http/e2e.yaml" - name: Sync Cloudevent e2e test e2e: "test/sync-cloudevent/e2e.yaml" + - name: Declarative Sync HTTP e2e test + e2e: "test/declarative/sync-http/e2e.yaml" + - name: Declarative Sync Cloudevent e2e test + e2e: "test/declarative/sync-cloudevent/e2e.yaml" + - name: Declarative multiple Sync HTTP e2e test + e2e: "test/declarative/sync-http-multiple/e2e.yaml" + - name: Declarative multiple Sync Cloudevent e2e test + e2e: "test/declarative/sync-cloudevent-multiple/e2e.yaml" steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/plugin_test.yaml b/.github/workflows/plugin_test.yaml index 2164418..5139d5d 100644 --- a/.github/workflows/plugin_test.yaml +++ b/.github/workflows/plugin_test.yaml @@ -8,6 +8,8 @@ on: - 'context/**' - 'plugin/**' - 'framework/**' + - 'functions/**' + - 'internal/**' - 'runtime/**' - 'test/**' - 'go.mod' diff --git a/.gitignore b/.gitignore index 5531298..5803bd0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ # IDE .idea/ -.vscode/ \ No newline at end of file +.vscode/ + +bin/ \ No newline at end of file diff --git a/context/context.go b/context/context.go index 805a8e4..ca98582 100644 --- a/context/context.go +++ b/context/context.go @@ -48,6 +48,7 @@ const ( Success = 200 InternalError = 500 defaultPort = "8080" + defaultHttpPattern = "/" daprSidecarGRPCPort = "50001" TracingProviderSkywalking = "skywalking" TracingProviderOpentelemetry = "opentelemetry" @@ -796,6 +797,10 @@ func parseContext() (*FunctionContext, error) { } } + if ctx.HttpPattern == "" { + ctx.HttpPattern = defaultHttpPattern + } + // When using self-hosted mode, configure the client port via env, // refer to https://docs.dapr.io/reference/environment/ port := os.Getenv("DAPR_GRPC_PORT") diff --git a/framework/framework.go b/framework/framework.go index f132d40..b3032d1 100644 --- a/framework/framework.go +++ b/framework/framework.go @@ -4,11 +4,14 @@ import ( "context" "errors" "net/http" + "os" cloudevents "github.com/cloudevents/sdk-go/v2" "k8s.io/klog/v2" ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/internal/functions" + "github.com/OpenFunction/functions-framework-go/internal/registry" "github.com/OpenFunction/functions-framework-go/plugin" plgExample "github.com/OpenFunction/functions-framework-go/plugin/plugin-example" "github.com/OpenFunction/functions-framework-go/plugin/skywalking" @@ -23,6 +26,7 @@ type functionsFrameworkImpl struct { postPlugins []plugin.Plugin pluginMap map[string]plugin.Plugin runtime runtime.Interface + registry *registry.Registry } // Framework is the interface for the function conversion. @@ -36,6 +40,9 @@ type Framework interface { func NewFramework() (*functionsFrameworkImpl, error) { fwk := &functionsFrameworkImpl{} + // Set the function registry + fwk.registry = registry.Default() + // Parse OpenFunction FunctionContext if ctx, err := ofctx.GetRuntimeContext(); err != nil { klog.Errorf("failed to parse OpenFunction FunctionContext: %v\n", err) @@ -59,17 +66,29 @@ func NewFramework() (*functionsFrameworkImpl, error) { func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{}) error { if fnHTTP, ok := fn.(func(http.ResponseWriter, *http.Request)); ok { - if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnHTTP); err != nil { + rf, err := functions.New(functions.WithFunctionName(fwk.funcContext.GetName()), functions.WithHTTP(fnHTTP), functions.WithFunctionPath(fwk.funcContext.GetHttpPattern())) + if err != nil { + klog.Errorf("failed to register function: %v", err) + } + if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil { klog.Errorf("failed to register function: %v", err) return err } } else if fnOpenFunction, ok := fn.(func(ofctx.Context, []byte) (ofctx.Out, error)); ok { - if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnOpenFunction); err != nil { + rf, err := functions.New(functions.WithFunctionName(fwk.funcContext.GetName()), functions.WithOpenFunction(fnOpenFunction), functions.WithFunctionPath(fwk.funcContext.GetHttpPattern())) + if err != nil { + klog.Errorf("failed to register function: %v", err) + } + if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil { klog.Errorf("failed to register function: %v", err) return err } } else if fnCloudEvent, ok := fn.(func(context.Context, cloudevents.Event) error); ok { - if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnCloudEvent); err != nil { + rf, err := functions.New(functions.WithFunctionName(fwk.funcContext.GetName()), functions.WithCloudEvent(fnCloudEvent), functions.WithFunctionPath(fwk.funcContext.GetHttpPattern())) + if err != nil { + klog.Errorf("failed to register function: %v", err) + } + if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil { klog.Errorf("failed to register function: %v", err) return err } @@ -82,6 +101,56 @@ func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{}) } func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error { + + target := os.Getenv("FUNCTION_TARGET") + + // if FUNCTION_TARGET is provided + if len(target) > 0 { + if fn, ok := fwk.registry.GetRegisteredFunction(target); ok { + klog.Infof("registering function: %s on path: %s", target, fn.GetPath()) + switch fn.GetFunctionType() { + case functions.HTTPType: + fwk.Register(ctx, fn.GetHTTPFunction()) + case functions.CloudEventType: + fwk.Register(ctx, fn.GetCloudEventFunction()) + case functions.OpenFunctionType: + fwk.Register(ctx, fn.GetOpenFunctionFunction()) + } + } else { + klog.Errorf("function not found: %s", target) + } + } else { + // if FUNCTION_TARGET is not provided but user uses declarative function, by default all registered functions will be deployed. + funcNames := fwk.registry.GetFunctionNames() + if len(funcNames) > 1 && fwk.funcContext.GetRuntime() == ofctx.Async { + return errors.New("only one function is allowed in async runtime") + } else if len(funcNames) > 0 { + klog.Info("no 'FUNCTION_TARGET' is provided, register all the functions in the registry") + for _, name := range funcNames { + if rf, ok := fwk.registry.GetRegisteredFunction(name); ok { + klog.Infof("registering function: %s on path: %s", rf.GetName(), rf.GetPath()) + switch rf.GetFunctionType() { + case functions.HTTPType: + if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil { + klog.Errorf("failed to register function: %v", err) + return err + } + case functions.CloudEventType: + if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil { + klog.Errorf("failed to register function: %v", err) + return err + } + case functions.OpenFunctionType: + if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil { + klog.Errorf("failed to register function: %v", err) + return err + } + } + } + } + } + } + err := fwk.runtime.Start(ctx) if err != nil { klog.Error("failed to start runtime service") @@ -136,13 +205,12 @@ func createRuntime(fwk *functionsFrameworkImpl) error { rt := fwk.funcContext.GetRuntime() port := fwk.funcContext.GetPort() pattern := fwk.funcContext.GetHttpPattern() - switch rt { case ofctx.Knative: fwk.runtime = knative.NewKnativeRuntime(port, pattern) return nil case ofctx.Async: - fwk.runtime, err = async.NewAsyncRuntime(port) + fwk.runtime, err = async.NewAsyncRuntime(port, pattern) if err != nil { return err } diff --git a/functions/options.go b/functions/options.go new file mode 100644 index 0000000..85c450f --- /dev/null +++ b/functions/options.go @@ -0,0 +1,11 @@ +package functions + +import ( + "github.com/OpenFunction/functions-framework-go/internal/functions" +) + +type FunctionOption = functions.FunctionOption + +var ( + WithFunctionPath = functions.WithFunctionPath +) diff --git a/functions/registers.go b/functions/registers.go new file mode 100644 index 0000000..2454ce6 --- /dev/null +++ b/functions/registers.go @@ -0,0 +1,37 @@ +// Package functions provides a way to declaratively register functions +// that can be used to handle incoming requests. +package functions + +import ( + "context" + "log" + "net/http" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/internal/registry" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +// HTTP registers an HTTP function that becomes the function handler served +// at "/" when environment variable `FUNCTION_TARGET=name` +func HTTP(name string, fn func(http.ResponseWriter, *http.Request), options ...FunctionOption) { + if err := registry.Default().RegisterHTTP(name, fn, options...); err != nil { + log.Fatalf("failure to register function: %s", err) + } +} + +// CloudEvent registers a CloudEvent function that becomes the function handler +// served at "/" when environment variable `FUNCTION_TARGET=name` +func CloudEvent(name string, fn func(context.Context, cloudevents.Event) error, options ...FunctionOption) { + if err := registry.Default().RegisterCloudEvent(name, fn, options...); err != nil { + log.Fatalf("failure to register function: %s", err) + } +} + +// OpenFunction registers a OpenFunction function that becomes the function handler +// served at "/" when environment variable `FUNCTION_TARGET=name` +func OpenFunction(name string, fn func(ofctx.Context, []byte) (ofctx.Out, error), options ...FunctionOption) { + if err := registry.Default().RegisterOpenFunction(name, fn, options...); err != nil { + log.Fatalf("failure to register function: %s", err) + } +} diff --git a/internal/functions/functions.go b/internal/functions/functions.go new file mode 100644 index 0000000..a1aafe0 --- /dev/null +++ b/internal/functions/functions.go @@ -0,0 +1,174 @@ +package functions + +import ( + "context" + "errors" + "fmt" + "net/http" + "regexp" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +const ( + HTTPType = "http" + CloudEventType = "cloudevent" + OpenFunctionType = "openfunction" + defaultPath = "/" +) + +// RegisteredFunction represents a function that has been +// registered with the registry. +type RegisteredFunction struct { + functionName string // The name of the function + functionPath string // The path of the function, default is '/' + functionType string // The type of the function, not using it currently + httpFn func(http.ResponseWriter, *http.Request) // Optional: The user's HTTP function + cloudEventFn func(context.Context, cloudevents.Event) error // Optional: The user's CloudEvent function + openFunctionFn func(ofctx.Context, []byte) (ofctx.Out, error) // Optional: The user's OpenFunction function +} + +type FunctionOption func() (func(*RegisteredFunction), error) + +func (rf *RegisteredFunction) setup(options ...FunctionOption) error { + if rf == nil { + return nil + } + for _, option := range options { + if option == nil { + continue + } + setter, err := option() + if err != nil { + return err + } + if setter != nil { + setter(rf) + } + } + + if rf.GetName() == "" { + return errors.New("No function name is registered") + } + + if rf.GetFunctionType() == "" { + return errors.New("No function is registered") + } + + return nil +} + +func (rf *RegisteredFunction) GetName() string { + return rf.functionName +} + +func (rf *RegisteredFunction) GetPath() string { + return rf.functionPath +} + +func (rf *RegisteredFunction) GetFunctionType() string { + return rf.functionType +} + +func (rf *RegisteredFunction) GetHTTPFunction() func(http.ResponseWriter, *http.Request) { + return rf.httpFn +} + +func (rf *RegisteredFunction) GetCloudEventFunction() func(context.Context, cloudevents.Event) error { + return rf.cloudEventFn +} + +func (rf *RegisteredFunction) GetOpenFunctionFunction() func(ofctx.Context, []byte) (ofctx.Out, error) { + return rf.openFunctionFn +} + +// failedOption - helper to expose error from option builder +func failedOption(err error) FunctionOption { + return func() (func(*RegisteredFunction), error) { + return nil, err + } +} + +// properOption - helper to expose valid setter from option builder +func properOption(setter func(*RegisteredFunction)) FunctionOption { + return func() (func(*RegisteredFunction), error) { + return setter, nil + } +} + +func New(options ...FunctionOption) (*RegisteredFunction, error) { + rf := &RegisteredFunction{functionPath: defaultPath} + + if err := rf.setup(options...); err != nil { + return nil, err + } + + return rf, nil +} + +func WithFunctionName(name string) FunctionOption { + if !isValidFunctionName(name) { + return failedOption(fmt.Errorf("Invalid function name: %s", name)) + } + return properOption(func(rf *RegisteredFunction) { + rf.functionName = name + }) +} + +// Returns true if the function name is valid +// - must contain only alphanumeric, numbers, or dash characters +// - must be <= 63 characters +// - must start with a letter +// - must end with a letter or number +func isValidFunctionName(name string) bool { + match, _ := regexp.MatchString("^[A-Za-z](?:[-_A-Za-z0-9]{0,61}[A-Za-z0-9])?$", name) + return match +} + +func WithFunctionPath(path string) FunctionOption { + if len(path) == 0 { + return failedOption(errors.New("Empty function path")) + } + + if path[0] != '/' { + return failedOption(fmt.Errorf("Function path must start with '/': %s", path)) + } + + return properOption(func(rf *RegisteredFunction) { + rf.functionPath = path + }) +} + +func WithHTTP(fn func(http.ResponseWriter, *http.Request)) FunctionOption { + if fn == nil { + return failedOption(errors.New("Function is nil")) + } + + return properOption(func(rf *RegisteredFunction) { + rf.functionType = HTTPType + rf.httpFn = fn + }) +} + +func WithCloudEvent(fn func(context.Context, cloudevents.Event) error) FunctionOption { + if fn == nil { + return failedOption(errors.New("Function is nil")) + } + + return properOption(func(rf *RegisteredFunction) { + rf.functionType = CloudEventType + rf.cloudEventFn = fn + }) +} + +func WithOpenFunction(fn func(ofctx.Context, []byte) (ofctx.Out, error)) FunctionOption { + if fn == nil { + return failedOption(errors.New("Function is nil")) + } + + return properOption(func(rf *RegisteredFunction) { + rf.functionType = OpenFunctionType + rf.openFunctionFn = fn + }) +} diff --git a/internal/functions/functions_test.go b/internal/functions/functions_test.go new file mode 100644 index 0000000..b37ac0a --- /dev/null +++ b/internal/functions/functions_test.go @@ -0,0 +1,83 @@ +package functions + +import ( + "context" + "fmt" + "net/http" + "testing" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +func TestNewHTTPFunction(t *testing.T) { + + name := "foo" + path := "/foo" + fn, err := New(WithFunctionName(name), WithFunctionPath(path), WithHTTP(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello World!") + })) + if err != nil { + t.Errorf("Fail to Create http function with name: %s, path: %s", name, path) + } + + if fn.GetFunctionType() != HTTPType { + t.Errorf("Expected function type to be %s, got %s", HTTPType, fn.GetFunctionType()) + } + + if fn.GetName() != name { + t.Errorf("Expected function name to be %s, got %s", name, fn.GetName()) + } + + if fn.GetPath() != path { + t.Errorf("Expected function path to be %s, got %s", path, fn.GetPath()) + } +} + +func TestNewCloudEventFunction(t *testing.T) { + + name := "foo" + path := "/foo" + fn, err := New(WithFunctionName(name), WithFunctionPath(path), WithCloudEvent(func(context.Context, cloudevents.Event) error { + return nil + })) + if err != nil { + t.Errorf("Fail to Create cloudevent function with name: %s, path: %s, error: %s", name, path, err) + } + + if fn.GetFunctionType() != CloudEventType { + t.Errorf("Expected function type to be %s, got %s", CloudEventType, fn.GetFunctionType()) + } + + if fn.GetName() != name { + t.Errorf("Expected function name to be %s, got %s", name, fn.GetName()) + } + + if fn.GetPath() != path { + t.Errorf("Expected function path to be %s, got %s", path, fn.GetPath()) + } +} + +func TestNewOpenFunctionFunction(t *testing.T) { + + name := "foo" + path := "/foo" + fn, err := New(WithFunctionName(name), WithFunctionPath(path), WithOpenFunction(func(ctx ofctx.Context, in []byte) (ofctx.Out, error) { + return ctx.ReturnOnSuccess(), nil + })) + if err != nil { + t.Errorf("Fail to Create openfunction function with name: %s, path: %s", name, path) + } + + if fn.GetFunctionType() != OpenFunctionType { + t.Errorf("Expected function type to be %s, got %s", OpenFunctionType, fn.GetFunctionType()) + } + + if fn.GetName() != name { + t.Errorf("Expected function name to be %s, got %s", name, fn.GetName()) + } + + if fn.GetPath() != path { + t.Errorf("Expected function path to be %s, got %s", path, fn.GetPath()) + } +} diff --git a/internal/registry/registry.go b/internal/registry/registry.go new file mode 100644 index 0000000..659cdde --- /dev/null +++ b/internal/registry/registry.go @@ -0,0 +1,126 @@ +package registry + +import ( + "context" + "fmt" + "net/http" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/internal/functions" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +// Registry is a registry of functions. +type Registry struct { + functions map[string]*functions.RegisteredFunction + paths map[string]string +} + +var defaultInstance = New() + +// Default returns the default, singleton registry instance. +func Default() *Registry { + return defaultInstance +} + +func New() *Registry { + return &Registry{ + functions: map[string]*functions.RegisteredFunction{}, + paths: map[string]string{}, + } +} + +func (r *Registry) IsEmpty() bool { + return len(r.functions) == 0 +} + +func (r *Registry) GetFunctionNames() []string { + funcNames := []string{} + for k := range r.functions { + funcNames = append(funcNames, k) + } + return funcNames +} + +// RegisterHTTP a HTTP function with a given name +func (r *Registry) RegisterHTTP(name string, fn func(http.ResponseWriter, *http.Request), options ...functions.FunctionOption) error { + if _, ok := r.functions[name]; ok { + return fmt.Errorf("function name already registered: %s", name) + } + + // append at the end to overwrite any option from user + options = append(options, functions.WithFunctionName(name)) + options = append(options, functions.WithHTTP(fn)) + + function, err := functions.New(options...) + if err != nil { + return err + } + + path := function.GetPath() + if _, ok := r.paths[path]; ok { + return fmt.Errorf("function path already registered: %s", path) + } + + r.functions[name] = function + r.paths[path] = name + return nil +} + +// RegistryCloudEvent a CloudEvent function with a given name +func (r *Registry) RegisterCloudEvent(name string, fn func(context.Context, cloudevents.Event) error, options ...functions.FunctionOption) error { + if _, ok := r.functions[name]; ok { + return fmt.Errorf("function name already registered: %s", name) + } + + // append at the end to overwrite any option from user + options = append(options, functions.WithFunctionName(name)) + options = append(options, functions.WithCloudEvent(fn)) + + function, err := functions.New(options...) + if err != nil { + return err + } + + path := function.GetPath() + if _, ok := r.paths[path]; ok { + return fmt.Errorf("function path already registered: %s", path) + } + + r.functions[name] = function + r.paths[path] = name + return nil +} + +// RegisterOpenFunction a OpenFunction function with a given name +func (r *Registry) RegisterOpenFunction(name string, fn func(ofctx.Context, []byte) (ofctx.Out, error), options ...functions.FunctionOption) error { + + if _, ok := r.functions[name]; ok { + return fmt.Errorf("function name already registered: %s", name) + } + + // append at the end to overwrite any option from user + options = append(options, functions.WithFunctionName(name)) + options = append(options, functions.WithOpenFunction(fn)) + + function, err := functions.New(options...) + if err != nil { + return err + } + + path := function.GetPath() + if _, ok := r.paths[path]; ok { + return fmt.Errorf("function path already registered: %s", path) + } + + r.functions[name] = function + r.paths[path] = name + + return nil +} + +// GetRegisteredFunction a registered function by name +func (r *Registry) GetRegisteredFunction(name string) (*functions.RegisteredFunction, bool) { + fn, ok := r.functions[name] + return fn, ok +} diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go new file mode 100644 index 0000000..6296add --- /dev/null +++ b/internal/registry/registry_test.go @@ -0,0 +1,109 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package registry + +import ( + "context" + "fmt" + "net/http" + "testing" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/internal/functions" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +func TestRegisterHTTP(t *testing.T) { + registry := New() + registry.RegisterHTTP("httpfn", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello World!") + }) + + fn, ok := registry.GetRegisteredFunction("httpfn") + if !ok { + t.Fatalf("Expected function to be registered") + } + if fn.GetName() != "httpfn" { + t.Errorf("Expected function name to be 'httpfn', got %s", fn.GetName()) + } +} + +func TestRegisterCloudEvent(t *testing.T) { + registry := New() + registry.RegisterCloudEvent("cefn", func(context.Context, cloudevents.Event) error { + return nil + }) + + fn, ok := registry.GetRegisteredFunction("cefn") + if !ok { + t.Fatalf("Expected function to be registered") + } + if fn.GetName() != "cefn" { + t.Errorf("Expected function name to be 'cefn', got %s", fn.GetName()) + } +} + +func TestRegisterOpenFunction(t *testing.T) { + registry := New() + registry.RegisterOpenFunction("ofnfn", func(ctx ofctx.Context, in []byte) (ofctx.Out, error) { + return ctx.ReturnOnSuccess(), nil + }) + + fn, ok := registry.GetRegisteredFunction("ofnfn") + if !ok { + t.Fatalf("Expected function to be registered") + } + if fn.GetName() != "ofnfn" { + t.Errorf("Expected function name to be 'ofnfn', got %s", fn.GetName()) + } +} + +func TestRegisterMultipleFunctions(t *testing.T) { + registry := New() + if err := registry.RegisterHTTP("multifn1", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello World!") + }, functions.WithFunctionPath("/multifn1")); err != nil { + t.Error("Expected \"multifn1\" function to be registered") + } + if err := registry.RegisterHTTP("multifn2", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello World 2!") + }, functions.WithFunctionPath("/multifn2")); err != nil { + t.Error("Expected \"multifn2\" function to be registered") + } + if err := registry.RegisterCloudEvent("multifn3", func(context.Context, cloudevents.Event) error { + return nil + }, functions.WithFunctionPath("/multifn3")); err != nil { + t.Error("Expected \"multifn3\" function to be registered") + } + if err := registry.RegisterOpenFunction("multifn4", func(ctx ofctx.Context, in []byte) (ofctx.Out, error) { + return ctx.ReturnOnSuccess(), nil + }, functions.WithFunctionPath("/multifn4")); err != nil { + t.Error("Expected \"multifn4\" function to be registered") + } +} + +func TestRegisterMultipleFunctionsError(t *testing.T) { + registry := New() + if err := registry.RegisterHTTP("samename", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello World!") + }); err != nil { + t.Error("Expected no error registering function") + } + + if err := registry.RegisterHTTP("samename", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Hello World 2!") + }); err == nil { + t.Error("Expected error registering function with same name") + } +} diff --git a/runtime/async/async.go b/runtime/async/async.go index 6ff2f05..2d280be 100644 --- a/runtime/async/async.go +++ b/runtime/async/async.go @@ -4,27 +4,34 @@ import ( "context" "errors" "fmt" - "net/http" "os" "strings" - cloudevents "github.com/cloudevents/sdk-go/v2" dapr "github.com/dapr/go-sdk/service/common" daprd "github.com/dapr/go-sdk/service/grpc" "k8s.io/klog/v2" ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/internal/functions" "github.com/OpenFunction/functions-framework-go/plugin" "github.com/OpenFunction/functions-framework-go/runtime" ) +const ( + defaultPattern = "/" +) + type Runtime struct { port string + pattern string handler dapr.Service grpcHander *FakeServer } -func NewAsyncRuntime(port string) (*Runtime, error) { +func NewAsyncRuntime(port string, pattern string) (*Runtime, error) { + if pattern == "" { + pattern = defaultPattern + } if testMode := os.Getenv(ofctx.TestModeEnvName); testMode == ofctx.TestModeOn { handler, grpcHandler, err := NewFakeService(fmt.Sprintf(":%s", port)) if err != nil { @@ -33,6 +40,7 @@ func NewAsyncRuntime(port string) (*Runtime, error) { } return &Runtime{ port: port, + pattern: pattern, handler: handler, grpcHander: grpcHandler, }, nil @@ -44,6 +52,7 @@ func NewAsyncRuntime(port string) (*Runtime, error) { } return &Runtime{ port: port, + pattern: pattern, handler: handler, grpcHander: nil, }, nil @@ -59,7 +68,7 @@ func (r *Runtime) RegisterHTTPFunction( ctx ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(http.ResponseWriter, *http.Request), + rf *functions.RegisteredFunction, ) error { return errors.New("async runtime cannot register http function") } @@ -69,7 +78,7 @@ func (r *Runtime) RegisterCloudEventFunction( funcContext ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(context.Context, cloudevents.Event) error, + rf *functions.RegisteredFunction, ) error { return errors.New("async runtime cannot register cloudevent function") } @@ -78,7 +87,7 @@ func (r *Runtime) RegisterOpenFunction( ctx ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(ofctx.Context, []byte) (ofctx.Out, error), + rf *functions.RegisteredFunction, ) error { // Register the asynchronous functions (based on the Dapr runtime) return func(f func(ofctx.Context, []byte) (ofctx.Out, error)) error { @@ -96,7 +105,7 @@ func (r *Runtime) RegisterOpenFunction( funcErr = r.handler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) { rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins) rm.FuncContext.SetEvent(name, in) - rm.FunctionRunWrapperWithHooks(fn) + rm.FunctionRunWrapperWithHooks(rf.GetOpenFunctionFunction()) switch rm.FuncOut.GetCode() { case ofctx.Success: @@ -118,7 +127,7 @@ func (r *Runtime) RegisterOpenFunction( funcErr = r.handler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) { rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins) rm.FuncContext.SetEvent(name, e) - rm.FunctionRunWrapperWithHooks(fn) + rm.FunctionRunWrapperWithHooks(rf.GetOpenFunctionFunction()) switch rm.FuncOut.GetCode() { case ofctx.Success: @@ -160,7 +169,7 @@ func (r *Runtime) RegisterOpenFunction( err := errors.New("no inputs defined for the function") klog.Errorf("failed to register function: %v\n", err) return err - }(fn) + }(rf.GetOpenFunctionFunction()) } func (r *Runtime) Name() ofctx.Runtime { diff --git a/runtime/knative/knative.go b/runtime/knative/knative.go index 9413be6..147869e 100644 --- a/runtime/knative/knative.go +++ b/runtime/knative/knative.go @@ -12,6 +12,7 @@ import ( "k8s.io/klog/v2" ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/internal/functions" "github.com/OpenFunction/functions-framework-go/plugin" "github.com/OpenFunction/functions-framework-go/runtime" ) @@ -26,8 +27,8 @@ const ( type Runtime struct { port string - handler *http.ServeMux pattern string + handler *http.ServeMux } func NewKnativeRuntime(port string, pattern string) *Runtime { @@ -36,8 +37,8 @@ func NewKnativeRuntime(port string, pattern string) *Runtime { } return &Runtime{ port: port, - handler: http.DefaultServeMux, pattern: pattern, + handler: http.DefaultServeMux, } } @@ -51,17 +52,17 @@ func (r *Runtime) RegisterOpenFunction( ctx ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(ofctx.Context, []byte) (ofctx.Out, error), + rf *functions.RegisteredFunction, ) error { // Initialize dapr client if it is nil ctx.InitDaprClientIfNil() // Register the synchronous function (based on Knaitve runtime) - r.handler.HandleFunc(r.pattern, func(w http.ResponseWriter, r *http.Request) { + r.handler.HandleFunc(rf.GetPath(), func(w http.ResponseWriter, r *http.Request) { rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins) rm.FuncContext.SetSyncRequest(w, r) defer RecoverPanicHTTP(w, "Function panic") - rm.FunctionRunWrapperWithHooks(fn) + rm.FunctionRunWrapperWithHooks(rf.GetOpenFunctionFunction()) switch rm.FuncOut.GetCode() { case ofctx.Success: @@ -82,13 +83,13 @@ func (r *Runtime) RegisterHTTPFunction( ctx ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(http.ResponseWriter, *http.Request), + rf *functions.RegisteredFunction, ) error { - r.handler.HandleFunc(r.pattern, func(w http.ResponseWriter, r *http.Request) { + r.handler.HandleFunc(rf.GetPath(), func(w http.ResponseWriter, r *http.Request) { rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins) rm.FuncContext.SetSyncRequest(w, r) defer RecoverPanicHTTP(w, "Function panic") - rm.FunctionRunWrapperWithHooks(fn) + rm.FunctionRunWrapperWithHooks(rf.GetHTTPFunction()) }) return nil } @@ -98,7 +99,7 @@ func (r *Runtime) RegisterCloudEventFunction( funcContext ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(context.Context, cloudevents.Event) error, + rf *functions.RegisteredFunction, ) error { p, err := cloudevents.NewHTTP() if err != nil { @@ -109,7 +110,7 @@ func (r *Runtime) RegisterCloudEventFunction( handleFn, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(ctx context.Context, ce cloudevents.Event) error { rm := runtime.NewRuntimeManager(funcContext, prePlugins, postPlugins) rm.FuncContext.SetEvent("", &ce) - rm.FunctionRunWrapperWithHooks(fn) + rm.FunctionRunWrapperWithHooks(rf.GetCloudEventFunction()) return rm.FuncContext.GetError() }) @@ -117,7 +118,7 @@ func (r *Runtime) RegisterCloudEventFunction( klog.Errorf("failed to create handler: %v\n", err) return err } - r.handler.Handle(r.pattern, handleFn) + r.handler.Handle(rf.GetPath(), handleFn) return nil } diff --git a/runtime/runtime.go b/runtime/runtime.go index 1898132..71f6664 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -9,6 +9,7 @@ import ( "k8s.io/klog/v2" ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/internal/functions" "github.com/OpenFunction/functions-framework-go/plugin" ) @@ -18,20 +19,20 @@ type Interface interface { ctx ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(http.ResponseWriter, *http.Request), + rf *functions.RegisteredFunction, ) error RegisterOpenFunction( ctx ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(ofctx.Context, []byte) (ofctx.Out, error), + rf *functions.RegisteredFunction, ) error RegisterCloudEventFunction( ctx context.Context, funcContex ofctx.RuntimeContext, prePlugins []plugin.Plugin, postPlugins []plugin.Plugin, - fn func(context.Context, cloudevents.Event) error, + rf *functions.RegisteredFunction, ) error Name() ofctx.Runtime GetHandler() interface{} diff --git a/test/declarative/sync-cloudevent-multiple/Dockerfile.sync.cloudevent b/test/declarative/sync-cloudevent-multiple/Dockerfile.sync.cloudevent new file mode 100644 index 0000000..cdf5aab --- /dev/null +++ b/test/declarative/sync-cloudevent-multiple/Dockerfile.sync.cloudevent @@ -0,0 +1,12 @@ +FROM golang:1.16-alpine + +ENV GO111MODULE=on +ENV GOPROXY=https://goproxy.cn,direct + +WORKDIR /ff + +ADD . /ff + +ENTRYPOINT ["go"] + +CMD ["run", "test/declarative/sync-cloudevent-multiple/main.go", "test/declarative/sync-cloudevent-multiple/cloudevent.go"] \ No newline at end of file diff --git a/test/declarative/sync-cloudevent-multiple/cloudevent.go b/test/declarative/sync-cloudevent-multiple/cloudevent.go new file mode 100644 index 0000000..2145bb9 --- /dev/null +++ b/test/declarative/sync-cloudevent-multiple/cloudevent.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + + "github.com/OpenFunction/functions-framework-go/functions" + cloudevents "github.com/cloudevents/sdk-go/v2" + "k8s.io/klog/v2" +) + +func init() { + functions.CloudEvent("HelloWorld", HelloWorld, functions.WithFunctionPath("/helloworld")) + functions.CloudEvent("Foo", Foo, functions.WithFunctionPath("/foo")) +} + +func HelloWorld(ctx context.Context, ce cloudevents.Event) error { + klog.Infof("cloudevent - Data: %s", ce.Data()) + return nil +} + +func Foo(ctx context.Context, ce cloudevents.Event) error { + klog.Infof("cloudevent - Data: %s", ce.Data()) + return nil +} diff --git a/test/declarative/sync-cloudevent-multiple/e2e.yaml b/test/declarative/sync-cloudevent-multiple/e2e.yaml new file mode 100644 index 0000000..6f72eb2 --- /dev/null +++ b/test/declarative/sync-cloudevent-multiple/e2e.yaml @@ -0,0 +1,48 @@ +setup: + env: kind + file: ../../kind.yaml + steps: + - name: build and load image + command: | + docker build -t sync_e2e_cloudevent:latest -f test/declarative/sync-cloudevent-multiple/Dockerfile.sync.cloudevent . + kind load docker-image sync_e2e_cloudevent:latest + + - name: setup manifests + path: manifests.yaml + wait: + - namespace: default + resource: pod + label-selector: app=sync-cloudevent + for: condition=Ready + +# kind: +# expose-ports: +# - namespace: default +# resource: services/sync +# port: 12345 + + timeout: 30m + +cleanup: + # always never success failure + on: success + +#trigger: +# action: "http" +# interval: 3s +# times: 10 +# url: http://127.0.0.1:80 +# method: GET + +verify: + # verify with retry strategy + retry: + # max retry count + count: 60 + # the interval between two attempts, e.g. 10s, 1m. + interval: 10s + cases: + - query: bash test/declarative/sync-cloudevent-multiple/verify.sh 127.0.0.1:80/helloworld | yq + expected: expected.data.yaml + - query: bash test/declarative/sync-cloudevent-multiple/verify.sh 127.0.0.1:80/foo | yq + expected: expected.data.yaml \ No newline at end of file diff --git a/test/declarative/sync-cloudevent-multiple/expected.data.yaml b/test/declarative/sync-cloudevent-multiple/expected.data.yaml new file mode 100644 index 0000000..8ad232c --- /dev/null +++ b/test/declarative/sync-cloudevent-multiple/expected.data.yaml @@ -0,0 +1,2 @@ +hello: world +sum: 2 \ No newline at end of file diff --git a/test/declarative/sync-cloudevent-multiple/main.go b/test/declarative/sync-cloudevent-multiple/main.go new file mode 100644 index 0000000..e2bc72e --- /dev/null +++ b/test/declarative/sync-cloudevent-multiple/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + + "github.com/fatih/structs" + "k8s.io/klog/v2" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/framework" + "github.com/OpenFunction/functions-framework-go/plugin" +) + +func main() { + ctx := context.Background() + fwk, err := framework.NewFramework() + if err != nil { + klog.Exit(err) + } + fwk.RegisterPlugins(getLocalPlugins()) + + if err := fwk.Start(ctx); err != nil { + klog.Exit(err) + } +} + +func getLocalPlugins() map[string]plugin.Plugin { + localPlugins := map[string]plugin.Plugin{ + Name: New(), + } + + if len(localPlugins) == 0 { + return nil + } else { + return localPlugins + } +} + +// Plugin + +const ( + Name = "plugin-custom" + Version = "v1" +) + +type PluginCustom struct { + PluginName string + PluginVersion string + StateC int64 +} + +var _ plugin.Plugin = &PluginCustom{} + +func New() *PluginCustom { + return &PluginCustom{ + StateC: int64(0), + } +} + +func (p *PluginCustom) Name() string { + return Name +} + +func (p *PluginCustom) Version() string { + return Version +} + +func (p *PluginCustom) Init() plugin.Plugin { + return New() +} + +func (p *PluginCustom) ExecPreHook(ctx ofctx.RuntimeContext, plugins map[string]plugin.Plugin) error { + p.StateC++ + return nil +} + +func (p *PluginCustom) ExecPostHook(ctx ofctx.RuntimeContext, plugins map[string]plugin.Plugin) error { + return nil +} + +func (p *PluginCustom) Get(fieldName string) (interface{}, bool) { + plgMap := structs.Map(p) + value, ok := plgMap[fieldName] + return value, ok +} diff --git a/test/declarative/sync-cloudevent-multiple/manifests.yaml b/test/declarative/sync-cloudevent-multiple/manifests.yaml new file mode 100644 index 0000000..3208b71 --- /dev/null +++ b/test/declarative/sync-cloudevent-multiple/manifests.yaml @@ -0,0 +1,62 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: sync-cloudevent + labels: + app: sync-cloudevent + case: sync +spec: + replicas: 1 + selector: + matchLabels: + app: sync-cloudevent + case: sync + template: + metadata: + labels: + app: sync-cloudevent + case: sync + spec: + containers: + - name: cloudevent + image: sync_e2e_cloudevent:latest + imagePullPolicy: IfNotPresent + env: + - name: FUNC_CONTEXT + value: | + { + "name": "sync-cloudevent", + "version": "v1", + "requestID": "a0f2ad8d-5062-4812-91e9-95416489fb01", + "port": "8080", + "prePlugins": ["plugin-custom", "plugin-example"], + "postPlugins": ["plugin-custom", "plugin-example"], + "inputs": {}, + "outputs": {}, + "runtime": "Knative" + } + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: POD_NAMESPACE + value: default + ports: + - containerPort: 8080 + name: function-port + protocol: TCP +--- +apiVersion: v1 +kind: Service +metadata: + name: sync +spec: + type: NodePort + selector: + app: sync-cloudevent + ports: + - protocol: TCP + port: 12345 + targetPort: 8080 + nodePort: 31234 diff --git a/test/declarative/sync-cloudevent-multiple/verify.sh b/test/declarative/sync-cloudevent-multiple/verify.sh new file mode 100644 index 0000000..3ea5a46 --- /dev/null +++ b/test/declarative/sync-cloudevent-multiple/verify.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +url=http://$1 +while true; do + st=$(curl -s -o /dev/null -w "%{http_code}" "$url" -H "Ce-Specversion: 1.0" -H "Ce-Type: io.openfunction.samples.helloworld" -H "Ce-Source: io.openfunction.samples/helloworldsource" -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" -H "Content-Type: application/json" -d '{"hello":"world"}') + if [ "$st" -eq 200 ]; then + data_result=$(KUBECONFIG=/tmp/e2e-k8s.config kubectl logs --tail=2 -l app="sync-cloudevent" -c cloudevent | grep Data | awk '{ print $8 }' | yq -P '.' -) + plugin_result=$(KUBECONFIG=/tmp/e2e-k8s.config kubectl logs --tail=2 -l app="sync-cloudevent" -c cloudevent | grep plugin | awk '{ print $8 }' | yq -P '.' -) + break + else + sleep 1 + continue + fi +done + +echo "$data_result" +echo "$plugin_result" diff --git a/test/declarative/sync-cloudevent/Dockerfile.sync.cloudevent b/test/declarative/sync-cloudevent/Dockerfile.sync.cloudevent new file mode 100644 index 0000000..f11de01 --- /dev/null +++ b/test/declarative/sync-cloudevent/Dockerfile.sync.cloudevent @@ -0,0 +1,12 @@ +FROM golang:1.16-alpine + +ENV GO111MODULE=on +ENV GOPROXY=https://goproxy.cn,direct + +WORKDIR /ff + +ADD . /ff + +ENTRYPOINT ["go"] + +CMD ["run", "test/declarative/sync-cloudevent/main.go", "test/declarative/sync-cloudevent/cloudevent.go"] \ No newline at end of file diff --git a/test/declarative/sync-cloudevent/cloudevent.go b/test/declarative/sync-cloudevent/cloudevent.go new file mode 100644 index 0000000..46ccbe3 --- /dev/null +++ b/test/declarative/sync-cloudevent/cloudevent.go @@ -0,0 +1,18 @@ +package main + +import ( + "context" + + "github.com/OpenFunction/functions-framework-go/functions" + cloudevents "github.com/cloudevents/sdk-go/v2" + "k8s.io/klog/v2" +) + +func init() { + functions.CloudEvent("HelloWorld", HelloWorld, functions.WithFunctionPath("/")) +} + +func HelloWorld(ctx context.Context, ce cloudevents.Event) error { + klog.Infof("cloudevent - Data: %s", ce.Data()) + return nil +} diff --git a/test/declarative/sync-cloudevent/e2e.yaml b/test/declarative/sync-cloudevent/e2e.yaml new file mode 100644 index 0000000..afec111 --- /dev/null +++ b/test/declarative/sync-cloudevent/e2e.yaml @@ -0,0 +1,46 @@ +setup: + env: kind + file: ../../kind.yaml + steps: + - name: build and load image + command: | + docker build -t sync_e2e_cloudevent:latest -f test/declarative/sync-cloudevent/Dockerfile.sync.cloudevent . + kind load docker-image sync_e2e_cloudevent:latest + + - name: setup manifests + path: manifests.yaml + wait: + - namespace: default + resource: pod + label-selector: app=sync-cloudevent + for: condition=Ready + +# kind: +# expose-ports: +# - namespace: default +# resource: services/sync +# port: 12345 + + timeout: 30m + +cleanup: + # always never success failure + on: success + +#trigger: +# action: "http" +# interval: 3s +# times: 10 +# url: http://127.0.0.1:80 +# method: GET + +verify: + # verify with retry strategy + retry: + # max retry count + count: 60 + # the interval between two attempts, e.g. 10s, 1m. + interval: 10s + cases: + - query: bash test/declarative/sync-cloudevent/verify.sh 127.0.0.1:80 | yq + expected: expected.data.yaml diff --git a/test/declarative/sync-cloudevent/expected.data.yaml b/test/declarative/sync-cloudevent/expected.data.yaml new file mode 100644 index 0000000..8ad232c --- /dev/null +++ b/test/declarative/sync-cloudevent/expected.data.yaml @@ -0,0 +1,2 @@ +hello: world +sum: 2 \ No newline at end of file diff --git a/test/declarative/sync-cloudevent/main.go b/test/declarative/sync-cloudevent/main.go new file mode 100644 index 0000000..e2bc72e --- /dev/null +++ b/test/declarative/sync-cloudevent/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + + "github.com/fatih/structs" + "k8s.io/klog/v2" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/framework" + "github.com/OpenFunction/functions-framework-go/plugin" +) + +func main() { + ctx := context.Background() + fwk, err := framework.NewFramework() + if err != nil { + klog.Exit(err) + } + fwk.RegisterPlugins(getLocalPlugins()) + + if err := fwk.Start(ctx); err != nil { + klog.Exit(err) + } +} + +func getLocalPlugins() map[string]plugin.Plugin { + localPlugins := map[string]plugin.Plugin{ + Name: New(), + } + + if len(localPlugins) == 0 { + return nil + } else { + return localPlugins + } +} + +// Plugin + +const ( + Name = "plugin-custom" + Version = "v1" +) + +type PluginCustom struct { + PluginName string + PluginVersion string + StateC int64 +} + +var _ plugin.Plugin = &PluginCustom{} + +func New() *PluginCustom { + return &PluginCustom{ + StateC: int64(0), + } +} + +func (p *PluginCustom) Name() string { + return Name +} + +func (p *PluginCustom) Version() string { + return Version +} + +func (p *PluginCustom) Init() plugin.Plugin { + return New() +} + +func (p *PluginCustom) ExecPreHook(ctx ofctx.RuntimeContext, plugins map[string]plugin.Plugin) error { + p.StateC++ + return nil +} + +func (p *PluginCustom) ExecPostHook(ctx ofctx.RuntimeContext, plugins map[string]plugin.Plugin) error { + return nil +} + +func (p *PluginCustom) Get(fieldName string) (interface{}, bool) { + plgMap := structs.Map(p) + value, ok := plgMap[fieldName] + return value, ok +} diff --git a/test/declarative/sync-cloudevent/manifests.yaml b/test/declarative/sync-cloudevent/manifests.yaml new file mode 100644 index 0000000..3208b71 --- /dev/null +++ b/test/declarative/sync-cloudevent/manifests.yaml @@ -0,0 +1,62 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: sync-cloudevent + labels: + app: sync-cloudevent + case: sync +spec: + replicas: 1 + selector: + matchLabels: + app: sync-cloudevent + case: sync + template: + metadata: + labels: + app: sync-cloudevent + case: sync + spec: + containers: + - name: cloudevent + image: sync_e2e_cloudevent:latest + imagePullPolicy: IfNotPresent + env: + - name: FUNC_CONTEXT + value: | + { + "name": "sync-cloudevent", + "version": "v1", + "requestID": "a0f2ad8d-5062-4812-91e9-95416489fb01", + "port": "8080", + "prePlugins": ["plugin-custom", "plugin-example"], + "postPlugins": ["plugin-custom", "plugin-example"], + "inputs": {}, + "outputs": {}, + "runtime": "Knative" + } + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: POD_NAMESPACE + value: default + ports: + - containerPort: 8080 + name: function-port + protocol: TCP +--- +apiVersion: v1 +kind: Service +metadata: + name: sync +spec: + type: NodePort + selector: + app: sync-cloudevent + ports: + - protocol: TCP + port: 12345 + targetPort: 8080 + nodePort: 31234 diff --git a/test/declarative/sync-cloudevent/verify.sh b/test/declarative/sync-cloudevent/verify.sh new file mode 100644 index 0000000..3ea5a46 --- /dev/null +++ b/test/declarative/sync-cloudevent/verify.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +url=http://$1 +while true; do + st=$(curl -s -o /dev/null -w "%{http_code}" "$url" -H "Ce-Specversion: 1.0" -H "Ce-Type: io.openfunction.samples.helloworld" -H "Ce-Source: io.openfunction.samples/helloworldsource" -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" -H "Content-Type: application/json" -d '{"hello":"world"}') + if [ "$st" -eq 200 ]; then + data_result=$(KUBECONFIG=/tmp/e2e-k8s.config kubectl logs --tail=2 -l app="sync-cloudevent" -c cloudevent | grep Data | awk '{ print $8 }' | yq -P '.' -) + plugin_result=$(KUBECONFIG=/tmp/e2e-k8s.config kubectl logs --tail=2 -l app="sync-cloudevent" -c cloudevent | grep plugin | awk '{ print $8 }' | yq -P '.' -) + break + else + sleep 1 + continue + fi +done + +echo "$data_result" +echo "$plugin_result" diff --git a/test/declarative/sync-http-multiple/Dockerfile.sync.http b/test/declarative/sync-http-multiple/Dockerfile.sync.http new file mode 100644 index 0000000..821d7d9 --- /dev/null +++ b/test/declarative/sync-http-multiple/Dockerfile.sync.http @@ -0,0 +1,12 @@ +FROM golang:1.16-alpine + +ENV GO111MODULE=on +ENV GOPROXY=https://goproxy.cn,direct + +WORKDIR /ff + +ADD . /ff + +ENTRYPOINT ["go"] + +CMD ["run", "test/declarative/sync-http-multiple/main.go", "test/declarative/sync-http-multiple/http.go"] \ No newline at end of file diff --git a/test/declarative/sync-http-multiple/e2e.yaml b/test/declarative/sync-http-multiple/e2e.yaml new file mode 100644 index 0000000..eb375cb --- /dev/null +++ b/test/declarative/sync-http-multiple/e2e.yaml @@ -0,0 +1,48 @@ +setup: + env: kind + file: ../../kind.yaml + steps: + - name: build and load image + command: | + docker build -t sync_e2e_http:latest -f test/declarative/sync-http-multiple/Dockerfile.sync.http . + kind load docker-image sync_e2e_http:latest + + - name: setup manifests + path: manifests.yaml + wait: + - namespace: default + resource: pod + label-selector: app=sync-http + for: condition=Ready + +# kind: +# expose-ports: +# - namespace: default +# resource: service/sync +# port: 12345 + + timeout: 30m + +cleanup: + # always never success failure + on: success + +#trigger: +# action: "http" +# interval: 3s +# times: 10 +# url: http://127.0.0.1:80 +# method: GET + +verify: + # verify with retry strategy + retry: + # max retry count + count: 60 + # the interval between two attempts, e.g. 10s, 1m. + interval: 10s + cases: + - query: bash test/declarative/sync-http-multiple/verify.sh 127.0.0.1:80/helloworld | yq + expected: expected.data-helloworld.yaml + - query: bash test/declarative/sync-http-multiple/verify.sh 127.0.0.1:80/foo | yq + expected: expected.data-foo.yaml \ No newline at end of file diff --git a/test/declarative/sync-http-multiple/expected.data-foo.yaml b/test/declarative/sync-http-multiple/expected.data-foo.yaml new file mode 100644 index 0000000..2d50a06 --- /dev/null +++ b/test/declarative/sync-http-multiple/expected.data-foo.yaml @@ -0,0 +1,2 @@ +hello: foo +sum: 2 \ No newline at end of file diff --git a/test/declarative/sync-http-multiple/expected.data-helloworld.yaml b/test/declarative/sync-http-multiple/expected.data-helloworld.yaml new file mode 100644 index 0000000..8ad232c --- /dev/null +++ b/test/declarative/sync-http-multiple/expected.data-helloworld.yaml @@ -0,0 +1,2 @@ +hello: world +sum: 2 \ No newline at end of file diff --git a/test/declarative/sync-http-multiple/http.go b/test/declarative/sync-http-multiple/http.go new file mode 100644 index 0000000..9aec9e0 --- /dev/null +++ b/test/declarative/sync-http-multiple/http.go @@ -0,0 +1,31 @@ +package main + +import ( + "encoding/json" + "net/http" + + "github.com/OpenFunction/functions-framework-go/functions" +) + +func init() { + functions.HTTP("HelloWorld", helloWorld, functions.WithFunctionPath("/helloworld")) + functions.HTTP("Foo", foo, functions.WithFunctionPath("/foo")) +} + +func helloWorld(w http.ResponseWriter, r *http.Request) { + response := map[string]string{ + "hello": "world", + } + responseBytes, _ := json.Marshal(response) + w.Header().Set("Content-type", "application/json") + w.Write(responseBytes) +} + +func foo(w http.ResponseWriter, r *http.Request) { + response := map[string]string{ + "hello": "foo", + } + responseBytes, _ := json.Marshal(response) + w.Header().Set("Content-type", "application/json") + w.Write(responseBytes) +} diff --git a/test/declarative/sync-http-multiple/main.go b/test/declarative/sync-http-multiple/main.go new file mode 100644 index 0000000..e693e82 --- /dev/null +++ b/test/declarative/sync-http-multiple/main.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + + "github.com/fatih/structs" + "k8s.io/klog/v2" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/framework" + "github.com/OpenFunction/functions-framework-go/plugin" +) + +func main() { + ctx := context.Background() + fwk, err := framework.NewFramework() + if err != nil { + klog.Exit(err) + } + fwk.RegisterPlugins(getLocalPlugins()) + if err := fwk.Start(ctx); err != nil { + klog.Exit(err) + } +} + +func getLocalPlugins() map[string]plugin.Plugin { + localPlugins := map[string]plugin.Plugin{ + Name: New(), + } + + if len(localPlugins) == 0 { + return nil + } else { + return localPlugins + } +} + +// Plugin + +const ( + Name = "plugin-custom" + Version = "v1" +) + +type PluginCustom struct { + PluginName string + PluginVersion string + StateC int64 +} + +var _ plugin.Plugin = &PluginCustom{} + +func New() *PluginCustom { + return &PluginCustom{ + StateC: int64(0), + } +} + +func (p *PluginCustom) Name() string { + return Name +} + +func (p *PluginCustom) Version() string { + return Version +} + +func (p *PluginCustom) Init() plugin.Plugin { + return New() +} + +func (p *PluginCustom) ExecPreHook(ctx ofctx.RuntimeContext, plugins map[string]plugin.Plugin) error { + p.StateC++ + return nil +} + +func (p *PluginCustom) ExecPostHook(ctx ofctx.RuntimeContext, plugins map[string]plugin.Plugin) error { + return nil +} + +func (p *PluginCustom) Get(fieldName string) (interface{}, bool) { + plgMap := structs.Map(p) + value, ok := plgMap[fieldName] + return value, ok +} diff --git a/test/declarative/sync-http-multiple/manifests.yaml b/test/declarative/sync-http-multiple/manifests.yaml new file mode 100644 index 0000000..f5251d4 --- /dev/null +++ b/test/declarative/sync-http-multiple/manifests.yaml @@ -0,0 +1,62 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: sync-http + labels: + app: sync-http + case: sync +spec: + replicas: 1 + selector: + matchLabels: + app: sync-http + case: sync + template: + metadata: + labels: + app: sync-http + case: sync + spec: + containers: + - name: http + image: sync_e2e_http:latest + imagePullPolicy: IfNotPresent + env: + - name: FUNC_CONTEXT + value: | + { + "name": "sync-http", + "version": "v1", + "requestID": "a0f2ad8d-5062-4812-91e9-95416489fb01", + "port": "8080", + "prePlugins": ["plugin-custom", "plugin-example"], + "postPlugins": ["plugin-custom", "plugin-example"], + "inputs": {}, + "outputs": {}, + "runtime": "Knative" + } + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: POD_NAMESPACE + value: default + ports: + - containerPort: 8080 + name: function-port + protocol: TCP +--- +apiVersion: v1 +kind: Service +metadata: + name: sync +spec: + type: NodePort + selector: + app: sync-http + ports: + - protocol: TCP + port: 12345 + targetPort: 8080 + nodePort: 31234 diff --git a/test/declarative/sync-http-multiple/verify.sh b/test/declarative/sync-http-multiple/verify.sh new file mode 100644 index 0000000..174bbc4 --- /dev/null +++ b/test/declarative/sync-http-multiple/verify.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +url=http://$1 +while true; do + st=$(curl -s -o /dev/null -w "%{http_code}" -X GET "$url") + if [ "$st" -eq 200 ]; then + data_result=$(curl -X GET -H "Content-type: application/json" -H "Accept: application/json" -s "$url" | yq -P ".") + plugin_result=$(KUBECONFIG=/tmp/e2e-k8s.config kubectl logs --tail=1 -l app="sync-http" -c http | awk '{ print $8 }' | yq -P '.' -) + break + else + sleep 1 + continue + fi +done + +echo "$data_result" +echo "$plugin_result" diff --git a/test/declarative/sync-http/Dockerfile.sync.http b/test/declarative/sync-http/Dockerfile.sync.http new file mode 100644 index 0000000..6c31771 --- /dev/null +++ b/test/declarative/sync-http/Dockerfile.sync.http @@ -0,0 +1,12 @@ +FROM golang:1.16-alpine + +ENV GO111MODULE=on +ENV GOPROXY=https://goproxy.cn,direct + +WORKDIR /ff + +ADD . /ff + +ENTRYPOINT ["go"] + +CMD ["run", "test/declarative/sync-http/main.go", "test/declarative/sync-http/http.go"] \ No newline at end of file diff --git a/test/declarative/sync-http/e2e.yaml b/test/declarative/sync-http/e2e.yaml new file mode 100644 index 0000000..9ca57a8 --- /dev/null +++ b/test/declarative/sync-http/e2e.yaml @@ -0,0 +1,46 @@ +setup: + env: kind + file: ../../kind.yaml + steps: + - name: build and load image + command: | + docker build -t sync_e2e_http:latest -f test/declarative/sync-http/Dockerfile.sync.http . + kind load docker-image sync_e2e_http:latest + + - name: setup manifests + path: manifests.yaml + wait: + - namespace: default + resource: pod + label-selector: app=sync-http + for: condition=Ready + +# kind: +# expose-ports: +# - namespace: default +# resource: service/sync +# port: 12345 + + timeout: 30m + +cleanup: + # always never success failure + on: success + +#trigger: +# action: "http" +# interval: 3s +# times: 10 +# url: http://127.0.0.1:80 +# method: GET + +verify: + # verify with retry strategy + retry: + # max retry count + count: 60 + # the interval between two attempts, e.g. 10s, 1m. + interval: 10s + cases: + - query: bash test/declarative/sync-http/verify.sh 127.0.0.1:80 | yq + expected: expected.data.yaml diff --git a/test/declarative/sync-http/expected.data.yaml b/test/declarative/sync-http/expected.data.yaml new file mode 100644 index 0000000..8ad232c --- /dev/null +++ b/test/declarative/sync-http/expected.data.yaml @@ -0,0 +1,2 @@ +hello: world +sum: 2 \ No newline at end of file diff --git a/test/declarative/sync-http/http.go b/test/declarative/sync-http/http.go new file mode 100644 index 0000000..84ca9f2 --- /dev/null +++ b/test/declarative/sync-http/http.go @@ -0,0 +1,30 @@ +package main + +import ( + "encoding/json" + "net/http" + + "github.com/OpenFunction/functions-framework-go/functions" +) + +func init() { + functions.HTTP("HelloWorld", helloWorld, functions.WithFunctionPath("/")) +} + +func helloWorld(w http.ResponseWriter, r *http.Request) { + response := map[string]string{ + "hello": "world", + } + responseBytes, _ := json.Marshal(response) + w.Header().Set("Content-type", "application/json") + w.Write(responseBytes) +} + +func foo(w http.ResponseWriter, r *http.Request) { + response := map[string]string{ + "hello": "bar", + } + responseBytes, _ := json.Marshal(response) + w.Header().Set("Content-type", "application/json") + w.Write(responseBytes) +} diff --git a/test/declarative/sync-http/main.go b/test/declarative/sync-http/main.go new file mode 100644 index 0000000..e693e82 --- /dev/null +++ b/test/declarative/sync-http/main.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + + "github.com/fatih/structs" + "k8s.io/klog/v2" + + ofctx "github.com/OpenFunction/functions-framework-go/context" + "github.com/OpenFunction/functions-framework-go/framework" + "github.com/OpenFunction/functions-framework-go/plugin" +) + +func main() { + ctx := context.Background() + fwk, err := framework.NewFramework() + if err != nil { + klog.Exit(err) + } + fwk.RegisterPlugins(getLocalPlugins()) + if err := fwk.Start(ctx); err != nil { + klog.Exit(err) + } +} + +func getLocalPlugins() map[string]plugin.Plugin { + localPlugins := map[string]plugin.Plugin{ + Name: New(), + } + + if len(localPlugins) == 0 { + return nil + } else { + return localPlugins + } +} + +// Plugin + +const ( + Name = "plugin-custom" + Version = "v1" +) + +type PluginCustom struct { + PluginName string + PluginVersion string + StateC int64 +} + +var _ plugin.Plugin = &PluginCustom{} + +func New() *PluginCustom { + return &PluginCustom{ + StateC: int64(0), + } +} + +func (p *PluginCustom) Name() string { + return Name +} + +func (p *PluginCustom) Version() string { + return Version +} + +func (p *PluginCustom) Init() plugin.Plugin { + return New() +} + +func (p *PluginCustom) ExecPreHook(ctx ofctx.RuntimeContext, plugins map[string]plugin.Plugin) error { + p.StateC++ + return nil +} + +func (p *PluginCustom) ExecPostHook(ctx ofctx.RuntimeContext, plugins map[string]plugin.Plugin) error { + return nil +} + +func (p *PluginCustom) Get(fieldName string) (interface{}, bool) { + plgMap := structs.Map(p) + value, ok := plgMap[fieldName] + return value, ok +} diff --git a/test/declarative/sync-http/manifests.yaml b/test/declarative/sync-http/manifests.yaml new file mode 100644 index 0000000..f5251d4 --- /dev/null +++ b/test/declarative/sync-http/manifests.yaml @@ -0,0 +1,62 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: sync-http + labels: + app: sync-http + case: sync +spec: + replicas: 1 + selector: + matchLabels: + app: sync-http + case: sync + template: + metadata: + labels: + app: sync-http + case: sync + spec: + containers: + - name: http + image: sync_e2e_http:latest + imagePullPolicy: IfNotPresent + env: + - name: FUNC_CONTEXT + value: | + { + "name": "sync-http", + "version": "v1", + "requestID": "a0f2ad8d-5062-4812-91e9-95416489fb01", + "port": "8080", + "prePlugins": ["plugin-custom", "plugin-example"], + "postPlugins": ["plugin-custom", "plugin-example"], + "inputs": {}, + "outputs": {}, + "runtime": "Knative" + } + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: POD_NAMESPACE + value: default + ports: + - containerPort: 8080 + name: function-port + protocol: TCP +--- +apiVersion: v1 +kind: Service +metadata: + name: sync +spec: + type: NodePort + selector: + app: sync-http + ports: + - protocol: TCP + port: 12345 + targetPort: 8080 + nodePort: 31234 diff --git a/test/declarative/sync-http/verify.sh b/test/declarative/sync-http/verify.sh new file mode 100644 index 0000000..174bbc4 --- /dev/null +++ b/test/declarative/sync-http/verify.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +url=http://$1 +while true; do + st=$(curl -s -o /dev/null -w "%{http_code}" -X GET "$url") + if [ "$st" -eq 200 ]; then + data_result=$(curl -X GET -H "Content-type: application/json" -H "Accept: application/json" -s "$url" | yq -P ".") + plugin_result=$(KUBECONFIG=/tmp/e2e-k8s.config kubectl logs --tail=1 -l app="sync-http" -c http | awk '{ print $8 }' | yq -P '.' -) + break + else + sleep 1 + continue + fi +done + +echo "$data_result" +echo "$plugin_result"