-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
App Logic and Runtime Separation (#402)
# What This PR Does / Why We Need It This PR resolves #385 by introducing a new way to write and run apps focused around app logic implementing the interface `app.App`, and runtime logic (such as a standalone operator) existing as a layer over `app.App` which translates runtime-specific behaviors into the `app.App` method calls. ## App Logic App logic is now handled as implementation of the new `app.App` interface, which describes all possible app behaviors. A default implementation, akin to `simple.Operator`, exists as `simple.App` (created with `simple.NewApp`). A user can also write a custom implementation of `app.App` if they so wish, and it should be compatible with any runner that consumes an `app.App`/`app.Provider`. `app.Provider` is an interface for usage by runners, which provides an `app.Manifest` to the runner to get app capabilities, and then allows for creation of an `app.App` with configuration sourced from the runner (such as KubeConfig, which may have varying methods by which it is loaded depending on the runner). ## Runtime Logic This PR introduces only one runner, `simple.StandaloneOperator`, which runs an app as a standalone operator. Combined with `simple.App`, this gives users the same (and slightly more) functionality that `simple.Operator` currently does. Planned in a future PR is `plugin.App` (or a similar name), which will run the app as a plugin and translate gRPC admission and CallResource calls into the app method calls. ## Codegen & Documentation Codegen for `grafana-app-sdk component add` will be introduced in a future PR to keep this one lighter, alongside documentation and examples for writing an app in this new manner (and existing documentation will be updated to working with apps this way). This PR is focused on introducing the new functionality, and as the "old" (current) way of writing apps is still perfectly valid, documentation will be added and updated in a docs-focused PR. ## Other Changes This PR also contains a drive-by fix for app manifest go codegen, as it currently improperly duplicates the `Validation` field for mutation when enabled. --------- Co-authored-by: Igor Suleymanov <[email protected]>
- Loading branch information
1 parent
c24f2ea
commit c9ec2d8
Showing
9 changed files
with
1,945 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package app | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"net/http" | ||
|
||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/client-go/rest" | ||
|
||
"github.com/grafana/grafana-app-sdk/resource" | ||
) | ||
|
||
var ( | ||
// ErrNotImplemented is an error that indicates that an App method is not implemented by the App implementation, | ||
// or the App method is not implemented for the provided Kind. | ||
// Typically, the App's ManifestData should indicate this as well. | ||
ErrNotImplemented = errors.New("not implemented") | ||
|
||
ErrCustomRouteNotFound = errors.New("custom route not found") | ||
) | ||
|
||
// ConversionRequest is a request to convert a Kind from one version to another | ||
type ConversionRequest struct { | ||
SourceGVK schema.GroupVersionKind | ||
TargetGVK schema.GroupVersionKind | ||
Raw RawObject | ||
} | ||
|
||
// RawObject represents the raw bytes of the object and its encoding, optionally with a decoded version of the object, | ||
// which may be any valid resource.Object implementation. | ||
type RawObject struct { | ||
Raw []byte `json:",inline"` | ||
Object resource.Object `json:"-"` | ||
Encoding resource.KindEncoding `json:"-"` | ||
} | ||
|
||
// ResourceCustomRouteRequest is a request to a custom subresource | ||
type ResourceCustomRouteRequest struct { | ||
ResourceIdentifier resource.FullIdentifier | ||
SubresourcePath string | ||
Method string | ||
Headers http.Header | ||
Body []byte | ||
} | ||
|
||
type ResourceCustomRouteResponse struct { | ||
Headers http.Header | ||
StatusCode int | ||
Body []byte | ||
} | ||
|
||
// Config is the app configuration used in a Provider for instantiating a new App. | ||
// It contains kubernetes configuration for communicating with an API server, the App's ManifestData as fetched | ||
// by the runner, and additional arbitrary configuration details that may be app-specific. | ||
type Config struct { | ||
// KubeConfig is a kubernetes rest.Config used to communicate with the API server where the App's Kinds are stored. | ||
KubeConfig rest.Config | ||
// ManifestData is the fetched ManifestData the runner is using for determining app kinds and capabilities. | ||
ManifestData ManifestData | ||
// SpecificConfig is app-specific config (as opposed to generic config) | ||
SpecificConfig SpecificConfig | ||
} | ||
|
||
// SpecificConfig is app-specific configuration which can vary from app to app | ||
// TODO: better type than any | ||
type SpecificConfig any | ||
|
||
// Provider represents a type which can provide an app manifest, and create a new App when given a configuration. | ||
// It should be used by runners to determine an app's capabilities and create an instance of the app to run. | ||
type Provider interface { | ||
// Manifest returns a Manifest, which may contain ManifestData or may point to a location where ManifestData can be fetched from. | ||
// The runner should use the ManifestData to determine app capabilities. | ||
Manifest() Manifest | ||
// SpecificConfig is any app-specific config that cannot be loaded by the runner that should be provided in NewApp | ||
SpecificConfig() SpecificConfig | ||
// NewApp creates a new App instance using the provided config, or returns an error if an App cannot be instantiated. | ||
NewApp(Config) (App, error) | ||
} | ||
|
||
// Runnable represents a type which can be run until it errors or the provided channel is stopped (or receives a message) | ||
type Runnable interface { | ||
// Run runs the process and blocks until one of the following conditions are met: | ||
// * An unrecoverable error occurs, in which case it returns the error | ||
// * The provided context completes | ||
// * The process completes and does not need to run again | ||
Run(context.Context) error | ||
} | ||
|
||
type AdmissionRequest resource.AdmissionRequest | ||
type MutatingResponse resource.MutatingResponse | ||
|
||
// App represents an app platform application logical structure. | ||
// An App is typically run with a wrapper, such as simple.NewStandaloneOperator, | ||
// which will present a runtime layer (such as kubernetes webhooks in the case of an operator), | ||
// and translate those into calls to the App. The wrapper is typically also responsible for lifecycle management | ||
// and running the Runnable provided by Runner(). | ||
// Pre-built implementations of App exist in the simple package, but any type which implements App | ||
// should be capable of being run by an app wrapper. | ||
type App interface { | ||
// Validate validates the incoming request, and returns an error if validation fails | ||
Validate(ctx context.Context, request *AdmissionRequest) error | ||
// Mutate runs mutation on the incoming request, responding with a MutatingResponse on success, or an error on failure | ||
Mutate(ctx context.Context, request *AdmissionRequest) (*MutatingResponse, error) | ||
// Convert converts the object based on the ConversionRequest, returning a RawObject which MUST contain | ||
// the converted bytes and encoding (Raw and Encoding respectively), and MAY contain the Object representation of those bytes. | ||
// It returns an error if the conversion fails, or if the functionality is not supported by the app. | ||
Convert(ctx context.Context, req ConversionRequest) (*RawObject, error) | ||
// CallResourceCustomRoute handles the call to a resource custom route, and returns a response to the request or an error. | ||
// If the route doesn't exist, the implementer MAY return ErrCustomRouteNotFound to signal to the runner, | ||
// or may choose to return a response with a not found status code and custom body. | ||
// It returns an error if the functionality is not supported by the app. | ||
CallResourceCustomRoute(ctx context.Context, request *ResourceCustomRouteRequest) (*ResourceCustomRouteResponse, error) | ||
// ManagedKinds returns a slice of Kinds which are managed by this App. | ||
// If there are multiple versions of a Kind, each one SHOULD be returned by this method, | ||
// as app runners may depend on having access to all kinds. | ||
ManagedKinds() []resource.Kind | ||
// Runner returns a Runnable with an app main loop. Any business logic that is not/can not be exposed | ||
// via other App interfaces should be contained within this method. | ||
// Runnable MAY be nil, in which case, the app has no main loop business logic. | ||
Runner() Runnable | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
package app | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
|
||
"github.com/grafana/grafana-app-sdk/logging" | ||
"github.com/grafana/grafana-app-sdk/metrics" | ||
) | ||
|
||
var RunnableCollectorDefaultErrorHandler = func(ctx context.Context, err error) bool { | ||
logging.FromContext(ctx).Error("runner exited with error", "error", err) | ||
return true | ||
} | ||
|
||
// NewMultiRunner creates a new MultiRunner with Runners as an empty slice and ErrorHandler set to RunnableCollectorDefaultErrorHandler | ||
func NewMultiRunner() *MultiRunner { | ||
return &MultiRunner{ | ||
Runners: make([]Runnable, 0), | ||
ErrorHandler: RunnableCollectorDefaultErrorHandler, | ||
} | ||
} | ||
|
||
// MultiRunner implements Runnable for running multiple Runnable instances. | ||
type MultiRunner struct { | ||
Runners []Runnable | ||
// ErrorHandler is called if one of the Runners returns an error. If the function call returns true, | ||
// the context will be canceled and all other Runners will also be prompted to exit. | ||
// If ErrorHandler is nil, RunnableCollectorDefaultErrorHandler is used. | ||
ErrorHandler func(context.Context, error) bool | ||
// ExitWait is how long to wait for Runners to exit after ErrorHandler returns true or the context is canceled | ||
// before stopping execution and returning a timeout error instead of exiting gracefully. | ||
// If ExitWait is nil, Run execution will always block until all Runners have exited. | ||
ExitWait *time.Duration | ||
} | ||
|
||
// Run runs all Runners in separate goroutines, and calls ErrorHandler if any of them exits early with an error. | ||
// If ErrorHandler returns true (or if there is no ErrorHandler), the other Runners are canceled and the error is returned. | ||
func (m *MultiRunner) Run(ctx context.Context) error { | ||
propagatedContext, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
errs := make(chan error, len(m.Runners)) | ||
defer close(errs) | ||
wg := &sync.WaitGroup{} | ||
for _, runner := range m.Runners { | ||
wg.Add(1) | ||
go func(r Runnable) { | ||
err := r.Run(propagatedContext) | ||
wg.Done() | ||
if err != nil { | ||
errs <- err | ||
} | ||
}(runner) | ||
} | ||
for { | ||
select { | ||
case err := <-errs: | ||
handler := m.ErrorHandler | ||
if handler == nil { | ||
handler = RunnableCollectorDefaultErrorHandler | ||
} | ||
if handler(propagatedContext, err) { | ||
cancel() | ||
if m.ExitWait != nil { | ||
if waitOrTimeout(wg, *m.ExitWait) { | ||
return fmt.Errorf("exit wait time exceeded waiting for Runners to complete: %w", err) | ||
} | ||
} else { | ||
wg.Wait() // Wait for all the runners to stop | ||
} | ||
return err | ||
} | ||
case <-ctx.Done(): | ||
cancel() | ||
if m.ExitWait != nil { | ||
if waitOrTimeout(wg, *m.ExitWait) { | ||
return fmt.Errorf("exit wait time exceeded waiting for Runners to complete") | ||
} | ||
} else { | ||
wg.Wait() // Wait for all the runners to stop | ||
} | ||
return nil | ||
} | ||
} | ||
} | ||
|
||
// PrometheusCollectors implements metrics.Provider by returning prometheus collectors for all Runners that also | ||
// implement metrics.Provider. | ||
func (m *MultiRunner) PrometheusCollectors() []prometheus.Collector { | ||
collectors := make([]prometheus.Collector, 0) | ||
for _, runner := range m.Runners { | ||
if cast, ok := runner.(metrics.Provider); ok { | ||
collectors = append(collectors, cast.PrometheusCollectors()...) | ||
} | ||
} | ||
return collectors | ||
} | ||
|
||
// AddRunnable adds the provided Runnable to the Runners slice. If the slice is nil, it will create it. | ||
func (m *MultiRunner) AddRunnable(runnable Runnable) { | ||
if m.Runners == nil { | ||
m.Runners = make([]Runnable, 0) | ||
} | ||
m.Runners = append(m.Runners, runnable) | ||
} | ||
|
||
func waitOrTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { | ||
ch := make(chan struct{}) | ||
go func() { | ||
defer close(ch) | ||
wg.Wait() | ||
}() | ||
select { | ||
case <-ch: | ||
return false | ||
case <-time.After(timeout): | ||
return true | ||
} | ||
} | ||
|
||
var ( | ||
ErrOtherRunStopped = errors.New("run stopped by another run call") | ||
) | ||
|
||
func NewSingletonRunner(runnable Runnable, stopOnAny bool) *SingletonRunner { | ||
return &SingletonRunner{ | ||
Wrapped: runnable, | ||
StopOnAny: stopOnAny, | ||
} | ||
} | ||
|
||
// SingletonRunner runs a single Runnable but allows for multiple distinct calls to Run() which cn have independent lifecycles | ||
type SingletonRunner struct { | ||
Wrapped Runnable | ||
// StopOnAny tells the SingletonRunner to stop all Run() calls if any one of them is stopped | ||
StopOnAny bool | ||
|
||
mux sync.Mutex | ||
running bool | ||
wg sync.WaitGroup | ||
cancel context.CancelCauseFunc | ||
ctx context.Context | ||
} | ||
|
||
// Run runs until the provided context.Context is closed, the underlying Runnable completes, or | ||
// another call to Run is stopped and StopOnAny is set to true (in which case ErrOtherRunStopped is returned) | ||
func (s *SingletonRunner) Run(ctx context.Context) error { | ||
s.wg.Add(1) | ||
defer s.wg.Done() | ||
go func(c context.Context) { | ||
<-c.Done() | ||
if s.StopOnAny && s.cancel != nil { | ||
s.cancel(ErrOtherRunStopped) | ||
} | ||
}(ctx) | ||
|
||
func() { | ||
s.mux.Lock() | ||
defer s.mux.Unlock() | ||
if !s.running { | ||
s.running = true | ||
// Stop cancel propagation and set up our own cancel function | ||
derived := context.WithoutCancel(ctx) | ||
s.ctx, s.cancel = context.WithCancelCause(derived) | ||
go func() { | ||
s.wg.Wait() | ||
s.mux.Lock() | ||
s.running = false | ||
s.mux.Unlock() | ||
}() | ||
|
||
go func() { | ||
err := s.Wrapped.Run(s.ctx) | ||
s.cancel(err) | ||
}() | ||
} | ||
}() | ||
|
||
select { | ||
case <-s.ctx.Done(): | ||
return context.Cause(s.ctx) | ||
case <-ctx.Done(): | ||
} | ||
return nil | ||
} | ||
|
||
// PrometheusCollectors implements metrics.Provider by returning prometheus collectors for the wrapped Runnable if it implements metrics.Provider. | ||
func (s *SingletonRunner) PrometheusCollectors() []prometheus.Collector { | ||
if cast, ok := s.Wrapped.(metrics.Provider); ok { | ||
return cast.PrometheusCollectors() | ||
} | ||
return nil | ||
} |
Oops, something went wrong.