Skip to content

Commit

Permalink
feat: declarative function api (#48)
Browse files Browse the repository at this point in the history
* init declarative functions

Signed-off-by: Lize Cai <[email protected]>

* add tests for declarative functions

Signed-off-by: Lize Cai <[email protected]>

* update unit tests

Signed-off-by: Lize Cai <[email protected]>

* update e2e files for declarative functions

Signed-off-by: Lize Cai <[email protected]>

* add default http pattern for existing single function register method

Signed-off-by: Lize Cai <[email protected]>

* fix declarative tests, update CMD in dockerfile

Signed-off-by: Lize Cai <[email protected]>
  • Loading branch information
lizzzcai authored May 22, 2022
1 parent f1e311b commit 798fc07
Show file tree
Hide file tree
Showing 43 changed files with 1,670 additions and 29 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:
- 'context/**'
- 'plugin/**'
- 'framework/**'
- 'functions/**'
- 'internal/**'
- 'runtime/**'
- 'test/**'
- 'go.mod'
Expand Down Expand Up @@ -45,6 +47,14 @@ jobs:
e2e: "test/sync-http/e2e.yaml"
- name: Sync Cloudevent e2e test
e2e: "test/sync-cloudevent/e2e.yaml"
- name: Declarative Sync HTTP e2e test
e2e: "test/declarative/sync-http/e2e.yaml"
- name: Declarative Sync Cloudevent e2e test
e2e: "test/declarative/sync-cloudevent/e2e.yaml"
- name: Declarative multiple Sync HTTP e2e test
e2e: "test/declarative/sync-http-multiple/e2e.yaml"
- name: Declarative multiple Sync Cloudevent e2e test
e2e: "test/declarative/sync-cloudevent-multiple/e2e.yaml"
steps:
- uses: actions/checkout@v2

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/plugin_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:
- 'context/**'
- 'plugin/**'
- 'framework/**'
- 'functions/**'
- 'internal/**'
- 'runtime/**'
- 'test/**'
- 'go.mod'
Expand Down
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# IDE
.idea/
.vscode/
.vscode/

bin/
5 changes: 5 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
Success = 200
InternalError = 500
defaultPort = "8080"
defaultHttpPattern = "/"
daprSidecarGRPCPort = "50001"
TracingProviderSkywalking = "skywalking"
TracingProviderOpentelemetry = "opentelemetry"
Expand Down Expand Up @@ -796,6 +797,10 @@ func parseContext() (*FunctionContext, error) {
}
}

if ctx.HttpPattern == "" {
ctx.HttpPattern = defaultHttpPattern
}

// When using self-hosted mode, configure the client port via env,
// refer to https://docs.dapr.io/reference/environment/
port := os.Getenv("DAPR_GRPC_PORT")
Expand Down
78 changes: 73 additions & 5 deletions framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"errors"
"net/http"
"os"

cloudevents "github.com/cloudevents/sdk-go/v2"
"k8s.io/klog/v2"

ofctx "github.com/OpenFunction/functions-framework-go/context"
"github.com/OpenFunction/functions-framework-go/internal/functions"
"github.com/OpenFunction/functions-framework-go/internal/registry"
"github.com/OpenFunction/functions-framework-go/plugin"
plgExample "github.com/OpenFunction/functions-framework-go/plugin/plugin-example"
"github.com/OpenFunction/functions-framework-go/plugin/skywalking"
Expand All @@ -23,6 +26,7 @@ type functionsFrameworkImpl struct {
postPlugins []plugin.Plugin
pluginMap map[string]plugin.Plugin
runtime runtime.Interface
registry *registry.Registry
}

// Framework is the interface for the function conversion.
Expand All @@ -36,6 +40,9 @@ type Framework interface {
func NewFramework() (*functionsFrameworkImpl, error) {
fwk := &functionsFrameworkImpl{}

// Set the function registry
fwk.registry = registry.Default()

// Parse OpenFunction FunctionContext
if ctx, err := ofctx.GetRuntimeContext(); err != nil {
klog.Errorf("failed to parse OpenFunction FunctionContext: %v\n", err)
Expand All @@ -59,17 +66,29 @@ func NewFramework() (*functionsFrameworkImpl, error) {

func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{}) error {
if fnHTTP, ok := fn.(func(http.ResponseWriter, *http.Request)); ok {
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnHTTP); err != nil {
rf, err := functions.New(functions.WithFunctionName(fwk.funcContext.GetName()), functions.WithHTTP(fnHTTP), functions.WithFunctionPath(fwk.funcContext.GetHttpPattern()))
if err != nil {
klog.Errorf("failed to register function: %v", err)
}
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
} else if fnOpenFunction, ok := fn.(func(ofctx.Context, []byte) (ofctx.Out, error)); ok {
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnOpenFunction); err != nil {
rf, err := functions.New(functions.WithFunctionName(fwk.funcContext.GetName()), functions.WithOpenFunction(fnOpenFunction), functions.WithFunctionPath(fwk.funcContext.GetHttpPattern()))
if err != nil {
klog.Errorf("failed to register function: %v", err)
}
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
} else if fnCloudEvent, ok := fn.(func(context.Context, cloudevents.Event) error); ok {
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, fnCloudEvent); err != nil {
rf, err := functions.New(functions.WithFunctionName(fwk.funcContext.GetName()), functions.WithCloudEvent(fnCloudEvent), functions.WithFunctionPath(fwk.funcContext.GetHttpPattern()))
if err != nil {
klog.Errorf("failed to register function: %v", err)
}
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
Expand All @@ -82,6 +101,56 @@ func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{})
}

func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {

target := os.Getenv("FUNCTION_TARGET")

// if FUNCTION_TARGET is provided
if len(target) > 0 {
if fn, ok := fwk.registry.GetRegisteredFunction(target); ok {
klog.Infof("registering function: %s on path: %s", target, fn.GetPath())
switch fn.GetFunctionType() {
case functions.HTTPType:
fwk.Register(ctx, fn.GetHTTPFunction())
case functions.CloudEventType:
fwk.Register(ctx, fn.GetCloudEventFunction())
case functions.OpenFunctionType:
fwk.Register(ctx, fn.GetOpenFunctionFunction())
}
} else {
klog.Errorf("function not found: %s", target)
}
} else {
// if FUNCTION_TARGET is not provided but user uses declarative function, by default all registered functions will be deployed.
funcNames := fwk.registry.GetFunctionNames()
if len(funcNames) > 1 && fwk.funcContext.GetRuntime() == ofctx.Async {
return errors.New("only one function is allowed in async runtime")
} else if len(funcNames) > 0 {
klog.Info("no 'FUNCTION_TARGET' is provided, register all the functions in the registry")
for _, name := range funcNames {
if rf, ok := fwk.registry.GetRegisteredFunction(name); ok {
klog.Infof("registering function: %s on path: %s", rf.GetName(), rf.GetPath())
switch rf.GetFunctionType() {
case functions.HTTPType:
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
case functions.CloudEventType:
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
case functions.OpenFunctionType:
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
}
}
}
}
}

err := fwk.runtime.Start(ctx)
if err != nil {
klog.Error("failed to start runtime service")
Expand Down Expand Up @@ -136,13 +205,12 @@ func createRuntime(fwk *functionsFrameworkImpl) error {
rt := fwk.funcContext.GetRuntime()
port := fwk.funcContext.GetPort()
pattern := fwk.funcContext.GetHttpPattern()

switch rt {
case ofctx.Knative:
fwk.runtime = knative.NewKnativeRuntime(port, pattern)
return nil
case ofctx.Async:
fwk.runtime, err = async.NewAsyncRuntime(port)
fwk.runtime, err = async.NewAsyncRuntime(port, pattern)
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions functions/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package functions

import (
"github.com/OpenFunction/functions-framework-go/internal/functions"
)

type FunctionOption = functions.FunctionOption

var (
WithFunctionPath = functions.WithFunctionPath
)
37 changes: 37 additions & 0 deletions functions/registers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Package functions provides a way to declaratively register functions
// that can be used to handle incoming requests.
package functions

import (
"context"
"log"
"net/http"

ofctx "github.com/OpenFunction/functions-framework-go/context"
"github.com/OpenFunction/functions-framework-go/internal/registry"
cloudevents "github.com/cloudevents/sdk-go/v2"
)

// HTTP registers an HTTP function that becomes the function handler served
// at "/" when environment variable `FUNCTION_TARGET=name`
func HTTP(name string, fn func(http.ResponseWriter, *http.Request), options ...FunctionOption) {
if err := registry.Default().RegisterHTTP(name, fn, options...); err != nil {
log.Fatalf("failure to register function: %s", err)
}
}

// CloudEvent registers a CloudEvent function that becomes the function handler
// served at "/" when environment variable `FUNCTION_TARGET=name`
func CloudEvent(name string, fn func(context.Context, cloudevents.Event) error, options ...FunctionOption) {
if err := registry.Default().RegisterCloudEvent(name, fn, options...); err != nil {
log.Fatalf("failure to register function: %s", err)
}
}

// OpenFunction registers a OpenFunction function that becomes the function handler
// served at "/" when environment variable `FUNCTION_TARGET=name`
func OpenFunction(name string, fn func(ofctx.Context, []byte) (ofctx.Out, error), options ...FunctionOption) {
if err := registry.Default().RegisterOpenFunction(name, fn, options...); err != nil {
log.Fatalf("failure to register function: %s", err)
}
}
Loading

0 comments on commit 798fc07

Please sign in to comment.