Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

strict_mode top-level config option #166

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/bundle/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type NewManagement interface {
GetGeneric(key any) (any, bool)
GetOrSetGeneric(key, value any) (actual any, loaded bool)
SetGeneric(key, value any)

GetStrictMode() bool
}

type componentErr struct {
Expand Down
1 change: 1 addition & 0 deletions internal/cli/common/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func CreateManager(
manager.OptSetMetrics(stats),
manager.OptSetTracer(trac),
manager.OptSetStreamsMode(streamsMode),
manager.OptSetStrictMode(conf.StrictMode),
}, mgrOpts...)

// Create resource manager.
Expand Down
41 changes: 41 additions & 0 deletions internal/component/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Observability interface {
Tracer() trace.TracerProvider
Path() []string
Label() string
GetStrictMode() bool
}

type mockObs struct{}
Expand All @@ -41,8 +42,48 @@ func (m mockObs) Label() string {
return ""
}

func (m mockObs) GetStrictMode() bool {
return false
}

// NoopObservability returns an implementation of Observability that does
// nothing.
func NoopObservability() Observability {
return mockObs{}
}

type mockObsStrictMode struct{ StrictMode bool }

func (m mockObsStrictMode) Metrics() metrics.Type {
return metrics.Noop()
}

func Noop() log.Modular {
return &log.Logger{}
}

func (m mockObsStrictMode) Logger() log.Modular {
return log.Noop()
}

func (m mockObsStrictMode) Tracer() trace.TracerProvider {
return noop.NewTracerProvider()
}

func (m mockObsStrictMode) Path() []string {
return nil
}

func (m mockObsStrictMode) Label() string {
return ""
}

func (m mockObsStrictMode) GetStrictMode() bool {
return true
}

// NoopObservability returns an implementation of Observability that does
// nothing.
func NoopObservabilityWithStrictMode() Observability {
return mockObsStrictMode{}
}
21 changes: 19 additions & 2 deletions internal/component/output/async_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (w *AsyncWriter) latencyMeasuringWrite(ctx context.Context, msg message.Bat
}

// loop is an internal loop that brokers incoming messages to output pipe.
func (w *AsyncWriter) loop() {
func (w *AsyncWriter) loop(strictMode bool) {
// Metrics paths
var (
mSent = w.stats.GetCounter("output_sent")
Expand Down Expand Up @@ -203,6 +203,23 @@ func (w *AsyncWriter) loop() {
return
}

if strictMode {
// check if there is an error in the payload, if there is then ack with the error.
hasErrors := false
err := ts.Payload.Iter(func(i int, part *message.Part) error {
if err := part.ErrorGet(); err != nil {
hasErrors = true
return err
}
return nil
})
if hasErrors {
w.log.Error("Failed to send message to %v: message batch contains errors\n", w.typeStr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the \n is necessary

Suggested change
w.log.Error("Failed to send message to %v: message batch contains errors\n", w.typeStr)
w.log.Error("Failed to send message to %v: message batch contains errors", w.typeStr)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other calls in this func to w.log.Error - do have a \n.

_ = ts.Ack(closeLeisureCtx, err)
continue
}
}

w.log.Trace("Attempting to write %v messages to '%v'.\n", ts.Payload.Len(), w.typeStr)
_, spans := tracing.WithChildSpans(w.tracer, traceName, ts.Payload)

Expand Down Expand Up @@ -255,7 +272,7 @@ func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error {
return component.ErrAlreadyStarted
}
w.transactions = ts
go w.loop()
go w.loop(w.mgr.GetStrictMode())
return nil
}

Expand Down
51 changes: 51 additions & 0 deletions internal/component/output/async_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,57 @@ func TestAsyncWriterCantConnect(t *testing.T) {
require.NoError(t, w.WaitForClose(ctx))
}

func TestAsyncWriterStictMode(t *testing.T) {
t.Parallel()

writerImpl := newAsyncMockWriter()

w, err := NewAsyncWriter("foo", 1, writerImpl, component.NoopObservabilityWithStrictMode())
if err != nil {
t.Error(err)
return
}

msgChan := make(chan message.Transaction)
resChan := make(chan error)

if err = w.Consume(msgChan); err != nil {
t.Error(err)
}

m := message.QuickBatch([][]byte{
[]byte(`foo`),
})

_ = m.Iter(func(i int, part *message.Part) error {
part.ErrorSet(errors.New("err1"))
return nil
})

go func() {
select {
case msgChan <- message.NewTransaction(m, resChan):
case <-time.After(time.Second):
t.Error("Timed out")
}
}()

select {
case writerImpl.connChan <- nil:
case <-time.After(time.Second):
t.Fatal("Timed out")
}

select {
case res, open := <-resChan:
require.True(t, open)
require.Equal(t, res, errors.New("err1"))
case <-time.After(time.Second):
t.Fatal("Timed out")
}

}

//------------------------------------------------------------------------------

func TestAsyncWriterCantSendClosed(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions internal/config/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
fieldTracer = "tracer"
fieldSystemCloseDelay = "shutdown_delay"
fieldSystemCloseTimeout = "shutdown_timeout"
fieldStrictMode = "strict_mode"
fieldTests = "tests"
)

Expand All @@ -33,6 +34,7 @@ type Type struct {
SystemCloseDelay string `yaml:"shutdown_delay"`
SystemCloseTimeout string `yaml:"shutdown_timeout"`
Tests []any `yaml:"tests"`
StrictMode bool `yaml:"strict_mode"`

rawSource any
}
Expand All @@ -59,6 +61,7 @@ func observabilityFields() docs.FieldSpecs {
}),
docs.FieldString(fieldSystemCloseDelay, "A period of time to wait for metrics and traces to be pulled or pushed from the process.").HasDefault("0s"),
docs.FieldString(fieldSystemCloseTimeout, "The maximum period of time to wait for a clean shutdown. If this time is exceeded Bento will forcefully close.").HasDefault("20s"),
docs.FieldBool(fieldStrictMode, "Set `strict_mode` to `true` to override the default behavior of sending messages batches containing errored messages to the `output` layer, instead rejecting the batch and propagating a `nack` back to the source.").HasDefault(false).Advanced().AtVersion("1.4.0"),
}
}

Expand Down Expand Up @@ -135,6 +138,11 @@ func noStreamFromParsed(prov docs.Provider, pConf *docs.ParsedConfig, conf *Type
return
}
}
if pConf.Contains(fieldStrictMode) {
if conf.StrictMode, err = pConf.FieldBool(fieldStrictMode); err != nil {
return
}
}
if pConf.Contains(fieldSystemCloseTimeout) {
if conf.SystemCloseTimeout, err = pConf.FieldString(fieldSystemCloseTimeout); err != nil {
return
Expand Down
7 changes: 7 additions & 0 deletions internal/manager/mock/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Manager struct {
M metrics.Type
L log.Modular
T trace.TracerProvider

StrictMode bool
}

// NewManager provides a new mock manager.
Expand Down Expand Up @@ -391,3 +393,8 @@ func (m *Manager) GetOrSetGeneric(key, value any) (actual any, loaded bool) {
func (m *Manager) SetGeneric(key, value any) {
m.genericValues.Store(key, value)
}

// SetGeneric attempts to set a generic resource to a given value by key.
func (m *Manager) GetStrictMode() bool {
return m.StrictMode
}
13 changes: 13 additions & 0 deletions internal/manager/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type Type struct {

// Generic key/value store for plugin implementations.
genericValues *sync.Map

StrictMode bool
}

// OptFunc is an opt setting for a manager type.
Expand Down Expand Up @@ -174,6 +176,13 @@ func OptSetStreamsMode(b bool) OptFunc {
}
}

// OptSetStrictMode sets the StrictMode
func OptSetStrictMode(b bool) OptFunc {
return func(t *Type) {
t.StrictMode = b
}
}

// OptSetFS determines which ifs.FS implementation to use for its filesystem.
// This can be used to override the default os based filesystem implementation.
func OptSetFS(fs ifs.FS) OptFunc {
Expand Down Expand Up @@ -305,6 +314,10 @@ func (t *Type) EngineVersion() string {
return t.engineVersion
}

func (t *Type) GetStrictMode() bool {
return t.StrictMode
}

//------------------------------------------------------------------------------

// ForStream returns a variant of this manager to be used by a particular stream
Expand Down
6 changes: 6 additions & 0 deletions public/service/stream_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type StreamBuilder struct {
configSpec docs.FieldSpecs
env *Environment
lintingDisabled bool
strictMode bool
gregfurman marked this conversation as resolved.
Show resolved Hide resolved
envVarLookupFn func(string) (string, bool)
}

Expand Down Expand Up @@ -145,6 +146,10 @@ func (s *StreamBuilder) SetThreads(n int) {
s.threads = n
}

func (s *StreamBuilder) SetStrictMode(mode bool) {
s.strictMode = mode
}

// PrintLogger is a simple Print based interface implemented by custom loggers.
type PrintLogger interface {
Printf(format string, v ...any)
Expand Down Expand Up @@ -914,6 +919,7 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) {
manager.OptSetEnvironment(env),
manager.OptSetBloblangEnvironment(s.env.getBloblangParserEnv()),
manager.OptSetFS(s.env.fs),
manager.OptSetStrictMode(s.strictMode),
)
if err != nil {
return nil, err
Expand Down
18 changes: 18 additions & 0 deletions website/docs/configuration/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ tracer:

shutdown_timeout: 20s
shutdown_delay: ""
strict_mode: false
```

</TabItem>
Expand Down Expand Up @@ -296,6 +297,21 @@ The `shutdown_timeout` option sets a hard deadline for Bento process to graceful

This option takes effect after the `shutdown_delay` duration has passed if that is enabled.

## Strict mode

:::caution EXPERIMENTAL
This configuration field is experimental and therefore breaking changes could be made to it outside of major version releases.
:::

Introduced in v1.4.0.

The default behavior of Bento is: attempt to send message batches that contain messages with errors to the configured sink. When `strict_mode` is set to `true`, Bento will instead reject all batches containing messages with errors, propagating a `nack` to the input layer. The input component's `nack` behavior will then determine how these rejected messages are handled, if the input doesn't support `nack'ing` a message, they will be re-processed from scratch.

More stable alternatives to `strict_mode` could be considered:

- [Error Handling][error_handling]
- [reject_errored][reject_errored]

[processors]: /docs/components/processors/about
[processors.mapping]: /docs/components/processors/mapping
[config-interp]: /docs/configuration/interpolation
Expand All @@ -304,3 +320,5 @@ This option takes effect after the `shutdown_delay` duration has passed if that
[config.resources]: /docs/configuration/resources
[json-references]: https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-03
[components]: /docs/components/about
[reject_errored]: /docs/components/outputs/reject_errored/
[error_handling]: /docs/configuration/error_handling
Loading