Skip to content

Commit

Permalink
Add strict_mode as a top-level config field
Browse files Browse the repository at this point in the history
Signed-off-by: Jem Davies <[email protected]>
  • Loading branch information
jem-davies committed Nov 17, 2024
1 parent aadacc9 commit b058924
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 2 deletions.
1 change: 1 addition & 0 deletions internal/cli/common/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func CreateManager(
manager.OptSetMetrics(stats),
manager.OptSetTracer(trac),
manager.OptSetStreamsMode(streamsMode),
manager.OptSetStrictMode(conf.StrictMode),
}, mgrOpts...)

// Create resource manager.
Expand Down
32 changes: 32 additions & 0 deletions internal/component/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,35 @@ func (m mockObs) Label() string {
func NoopObservability() Observability {
return mockObs{}
}

type mockObsStrictMode struct{ StrictMode bool }

func (m mockObsStrictMode) Metrics() metrics.Type {
return metrics.Noop()
}

func Noop() log.Modular {
return &log.Logger{}
}

func (m mockObsStrictMode) Logger() log.Modular {
return log.Noop()
}

func (m mockObsStrictMode) Tracer() trace.TracerProvider {
return noop.NewTracerProvider()
}

func (m mockObsStrictMode) Path() []string {
return nil
}

func (m mockObsStrictMode) Label() string {
return ""
}

// NoopObservability returns an implementation of Observability that does
// nothing.
func NoopObservabilityWithStrictMode() Observability {
return mockObsStrictMode{StrictMode: true}
}
40 changes: 38 additions & 2 deletions internal/component/output/async_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package output
import (
"context"
"errors"
"reflect"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (w *AsyncWriter) latencyMeasuringWrite(ctx context.Context, msg message.Bat
}

// loop is an internal loop that brokers incoming messages to output pipe.
func (w *AsyncWriter) loop() {
func (w *AsyncWriter) loop(strictMode bool) {
// Metrics paths
var (
mSent = w.stats.GetCounter("output_sent")
Expand Down Expand Up @@ -203,6 +204,23 @@ func (w *AsyncWriter) loop() {
return
}

if strictMode {
// check if there is an error in the payload, if there is then ack with the error.
hasErrors := false
err := ts.Payload.Iter(func(i int, part *message.Part) error {
if err := part.ErrorGet(); err != nil {
hasErrors = true
return err
}
return nil
})
if hasErrors {
w.log.Error("Failed to send message to %v: message batch contains errors\n", w.typeStr)
_ = ts.Ack(closeLeisureCtx, err)
continue
}
}

w.log.Trace("Attempting to write %v messages to '%v'.\n", ts.Payload.Len(), w.typeStr)
_, spans := tracing.WithChildSpans(w.tracer, traceName, ts.Payload)

Expand Down Expand Up @@ -249,13 +267,31 @@ func (w *AsyncWriter) loop() {
wg.Wait()
}

func (w *AsyncWriter) getStrictMode() bool {
v := reflect.ValueOf(w.mgr)

if v.Kind() == reflect.Pointer && !v.IsNil() {
v = v.Elem()
}

if v.Kind() == reflect.Struct {
field := v.FieldByName("StrictMode")
if field.IsValid() && field.Kind() == reflect.Bool {
return field.Bool()
}
}

return false
}

// Consume assigns a messages channel for the output to read.
func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error {
if w.transactions != nil {
return component.ErrAlreadyStarted
}
w.transactions = ts
go w.loop()
strictMode := w.getStrictMode()
go w.loop(strictMode)
return nil
}

Expand Down
51 changes: 51 additions & 0 deletions internal/component/output/async_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,57 @@ func TestAsyncWriterCantConnect(t *testing.T) {
require.NoError(t, w.WaitForClose(ctx))
}

func TestAsyncWriterStictMode(t *testing.T) {
t.Parallel()

writerImpl := newAsyncMockWriter()

w, err := NewAsyncWriter("foo", 1, writerImpl, component.NoopObservabilityWithStrictMode())
if err != nil {
t.Error(err)
return
}

msgChan := make(chan message.Transaction)
resChan := make(chan error)

if err = w.Consume(msgChan); err != nil {
t.Error(err)
}

m := message.QuickBatch([][]byte{
[]byte(`foo`),
})

_ = m.Iter(func(i int, part *message.Part) error {
part.ErrorSet(errors.New("err1"))
return nil
})

go func() {
select {
case msgChan <- message.NewTransaction(m, resChan):
case <-time.After(time.Second):
t.Error("Timed out")
}
}()

select {
case writerImpl.connChan <- nil:
case <-time.After(time.Second):
t.Fatal("Timed out")
}

select {
case res, open := <-resChan:
require.True(t, open)
require.Equal(t, res, errors.New("err1"))
case <-time.After(time.Second):
t.Fatal("Timed out")
}

}

//------------------------------------------------------------------------------

func TestAsyncWriterCantSendClosed(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions internal/config/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
fieldTracer = "tracer"
fieldSystemCloseDelay = "shutdown_delay"
fieldSystemCloseTimeout = "shutdown_timeout"
fieldStrictMode = "strict_mode"
fieldTests = "tests"
)

Expand All @@ -33,6 +34,7 @@ type Type struct {
SystemCloseDelay string `yaml:"shutdown_delay"`
SystemCloseTimeout string `yaml:"shutdown_timeout"`
Tests []any `yaml:"tests"`
StrictMode bool `yaml:"strict_mode"`

rawSource any
}
Expand All @@ -59,6 +61,7 @@ func observabilityFields() docs.FieldSpecs {
}),
docs.FieldString(fieldSystemCloseDelay, "A period of time to wait for metrics and traces to be pulled or pushed from the process.").HasDefault("0s"),
docs.FieldString(fieldSystemCloseTimeout, "The maximum period of time to wait for a clean shutdown. If this time is exceeded Bento will forcefully close.").HasDefault("20s"),
docs.FieldBool(fieldStrictMode, "").HasDefault(false),
}
}

Expand Down Expand Up @@ -135,6 +138,11 @@ func noStreamFromParsed(prov docs.Provider, pConf *docs.ParsedConfig, conf *Type
return
}
}
if pConf.Contains(fieldStrictMode) {
if conf.StrictMode, err = pConf.FieldBool(fieldStrictMode); err != nil {
return
}
}
if pConf.Contains(fieldSystemCloseTimeout) {
if conf.SystemCloseTimeout, err = pConf.FieldString(fieldSystemCloseTimeout); err != nil {
return
Expand Down
8 changes: 8 additions & 0 deletions internal/manager/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type Type struct {

// Generic key/value store for plugin implementations.
genericValues *sync.Map

StrictMode bool
}

// OptFunc is an opt setting for a manager type.
Expand Down Expand Up @@ -174,6 +176,12 @@ func OptSetStreamsMode(b bool) OptFunc {
}
}

func OptSetStrictMode(b bool) OptFunc {
return func(t *Type) {
t.StrictMode = b
}
}

// OptSetFS determines which ifs.FS implementation to use for its filesystem.
// This can be used to override the default os based filesystem implementation.
func OptSetFS(fs ifs.FS) OptFunc {
Expand Down
7 changes: 7 additions & 0 deletions public/service/stream_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type StreamBuilder struct {
env *Environment
lintingDisabled bool
envVarLookupFn func(string) (string, bool)
strictMode bool
}

// NewStreamBuilder creates a new StreamBuilder.
Expand All @@ -89,6 +90,7 @@ func NewStreamBuilder() *StreamBuilder {
configSpec: tmpSpec,
env: globalEnvironment,
envVarLookupFn: os.LookupEnv,
strictMode: false,
}
}

Expand Down Expand Up @@ -145,6 +147,10 @@ func (s *StreamBuilder) SetThreads(n int) {
s.threads = n
}

func (s *StreamBuilder) SetStrictMode(mode bool) {
s.strictMode = mode
}

// PrintLogger is a simple Print based interface implemented by custom loggers.
type PrintLogger interface {
Printf(format string, v ...any)
Expand Down Expand Up @@ -914,6 +920,7 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) {
manager.OptSetEnvironment(env),
manager.OptSetBloblangEnvironment(s.env.getBloblangParserEnv()),
manager.OptSetFS(s.env.fs),
manager.OptSetStrictMode(s.strictMode),
)
if err != nil {
return nil, err
Expand Down
18 changes: 18 additions & 0 deletions website/docs/configuration/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ tracer:

shutdown_timeout: 20s
shutdown_delay: ""
strict_mode: false
```
</TabItem>
Expand Down Expand Up @@ -296,6 +297,21 @@ The `shutdown_timeout` option sets a hard deadline for Bento process to graceful

This option takes effect after the `shutdown_delay` duration has passed if that is enabled.

## Strict mode

:::caution EXPERIMENTAL
This configuration field is experimental and therefore breaking changes could be made to it outside of major version releases.
:::

Introduced in v1.4.0.

The default behavior of Bento is to attempt to send messages that have errored to the configured sink. The `strict_mode` option can be set to `true` to override this default behavior of Bento, and instead will reject any message batches with messages that have errors in them. Ultimately this will propagate a Nack to the input layer, which then depending on the component configured will either be handled or the message will be reprocessed from scratch.

More stable alternatives to `strict_mode` could be considered:

- [Error Handling][error_handling]
- [`reject_errored`][reject_errored]

[processors]: /docs/components/processors/about
[processors.mapping]: /docs/components/processors/mapping
[config-interp]: /docs/configuration/interpolation
Expand All @@ -304,3 +320,5 @@ This option takes effect after the `shutdown_delay` duration has passed if that
[config.resources]: /docs/configuration/resources
[json-references]: https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-03
[components]: /docs/components/about
[reject_errored]: /docs/components/outputs/reject_errored/
[error_handling]: /docs/configuration/error_handling

0 comments on commit b058924

Please sign in to comment.