Skip to content

Commit

Permalink
fix(machine): improve timeouts (eg add EvalTimeout)
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Feb 15, 2025
1 parent f8a0a2f commit b0b34fb
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 29 deletions.
101 changes: 76 additions & 25 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ var _ Api = &Machine{}
type Machine struct {
// Maximum number of mutations that can be queued. Default: 1000.
QueueLimit int
// Time for a handler to execute. Default: 100ms. See Opts.HandlerTimeout.
// HandlerTimeout defined the time for a handler to execute, before it causes
// an Exception. Default: 1s. See also Opts.HandlerTimeout.
// Using HandlerTimeout can cause race conditions, see Event.IsValid().
HandlerTimeout time.Duration
// EvalTimeout is the time the machine will try to execute an eval func.
// Like any other handler, eval func also has HandlerTimeout. Default: 1s.
EvalTimeout time.Duration
// If true, the machine will print all exceptions to stdout. Default: true.
// Requires an ExceptionHandler binding and Machine.PanicToException set.
LogStackTrace bool
Expand Down Expand Up @@ -53,7 +58,7 @@ type Machine struct {
disposed atomic.Bool
queueRunning atomic.Bool
// tags are short strings describing the machine.
tags []string
tags atomic.Pointer[[]string]
// tracers are optional tracers for telemetry integrations.
tracers []Tracer
tracersLock sync.RWMutex
Expand All @@ -65,6 +70,7 @@ type Machine struct {
// states is a map of state names to state definitions.
// TODO refac: schema
states Struct
schemaVer atomic.Int32
statesLock sync.RWMutex
// activeStates is list of currently active states.
activeStates S
Expand Down Expand Up @@ -164,12 +170,13 @@ func New(ctx context.Context, statesStruct Struct, opts *Opts) *Machine {

m := &Machine{
HandlerTimeout: 100 * time.Millisecond,
EvalTimeout: time.Second,
LogStackTrace: true,
PanicToException: true,
QueueLimit: 1000,
DisposeTimeout: time.Second,

id: RandId(),
id: randId(),
states: parsedStates,
clock: Clock{},
handlers: []*handler{},
Expand Down Expand Up @@ -226,14 +233,15 @@ func New(ctx context.Context, statesStruct Struct, opts *Opts) *Machine {
m.QueueLimit = opts.QueueLimit
}
if len(opts.Tags) > 0 {
m.tags = slicesUniq(opts.Tags)
tags := slicesUniq(opts.Tags)
m.tags.Store(&tags)
}
m.detectEval = opts.DetectEval
parent = opts.Parent
m.parentId = opts.ParentID
}

if os.Getenv(EnvAmDetectEval) != "" || os.Getenv(EnvAmDebug) != "" {
if os.Getenv(EnvAmDetectEval) != "" {
m.detectEval = true
}

Expand Down Expand Up @@ -825,11 +833,17 @@ func (m *Machine) TimeSum(states S) uint64 {
// transition (Executed, Queued, Canceled).
// Like every mutation method, it will resolve relations and trigger handlers.
func (m *Machine) Add(states S, args A) Result {
if m.disposed.Load() || m.disposing.Load() ||
int(m.queueLen.Load()) >= m.QueueLimit {

if m.disposed.Load() || m.disposing.Load() {
return Canceled
}

// let Exception in even with a full queue, but only once
if int(m.queueLen.Load()) >= m.QueueLimit {
if !slices.Contains(states, Exception) || m.IsErr() {
return Canceled
}
}

m.queueMutation(MutationAdd, states, args, nil)
m.breakpoint(states, nil)

Expand Down Expand Up @@ -881,9 +895,7 @@ func (m *Machine) AddErr(err error, args A) Result {
// Like every mutation method, it will resolve relations and trigger handlers.
// AddErrState produces a stack trace of the error, if LogStackTrace is enabled.
func (m *Machine) AddErrState(state string, err error, args A) Result {
if m.disposed.Load() || m.disposing.Load() ||
int(m.queueLen.Load()) >= m.QueueLimit {

if m.disposed.Load() || m.disposing.Load() {
return Canceled
}
// TODO test Err()
Expand Down Expand Up @@ -961,12 +973,17 @@ func (m *Machine) Err() error {
// the transition (Executed, Queued, Canceled).
// Like every mutation method, it will resolve relations and trigger handlers.
func (m *Machine) Remove(states S, args A) Result {
if m.disposed.Load() || m.disposing.Load() ||
int(m.queueLen.Load()) >= m.QueueLimit {

if m.disposed.Load() || m.disposing.Load() {
return Canceled
}

// let Exception in even with a full queue, but only once
if int(m.queueLen.Load()) >= m.QueueLimit {
if !slices.Contains(states, Exception) || !m.IsErr() {
return Canceled
}
}

// return early if none of the states is active
m.queueLock.RLock()
lenQueue := len(m.queue)
Expand Down Expand Up @@ -1019,7 +1036,16 @@ func (m *Machine) ParentId() string {

// Tags returns machine's tags, a list of unstructured strings without spaces.
func (m *Machine) Tags() []string {
return m.tags
tags := m.tags.Load()
if tags != nil {
return slices.Clone(*tags)
}
return nil
}

// Tags returns machine's tags, a list of unstructured strings without spaces.
func (m *Machine) SetTags(tags []string) {
m.tags.Store(&tags)
}

// Is checks if all the passed states are currently active.
Expand Down Expand Up @@ -1251,16 +1277,18 @@ func (m *Machine) Eval(source string, fn func(), ctx context.Context) bool {
// wait with a timeout
select {

case <-time.After(m.HandlerTimeout / 2):
case <-time.After(m.EvalTimeout):
canceled.Store(true)
m.log(LogOps, "[eval:timeout] %s", source[0])
m.AddErr(fmt.Errorf("%w: eval:%s", ErrHandlerTimeout, source), nil)
m.AddErr(fmt.Errorf("%w: eval:%s", ErrEvalTimeout, source), nil)
return false

case <-m.ctx.Done():
canceled.Store(true)
return false

case <-ctx.Done():
canceled.Store(true)
m.log(LogDecisions, "[eval:ctxdone] %s", source[0])
return false

Expand Down Expand Up @@ -2057,7 +2085,7 @@ func (m *Machine) Log(msg string, args ...any) {
// LogLvl adds an internal log entry from the outside. It should be used only
// by packages extending pkg/machine. Use Log instead.
func (m *Machine) LogLvl(lvl LogLevel, msg string, args ...any) {
// refac: LvlMsg
// TODO refac: SysLog
if m.disposed.Load() {
return
}
Expand Down Expand Up @@ -2292,7 +2320,7 @@ func (m *Machine) processHandlers(e *Event) (Result, bool) {
m.tracersLock.RLock()
tx := m.t.Load()
for i := range m.tracers {
m.tracers[i].HandlerStart(m.t.Load(), handlerName, methodName)
m.tracers[i].HandlerStart(tx, handlerName, methodName)
}
m.tracersLock.RUnlock()
handlerCall := &handlerCall{
Expand All @@ -2302,21 +2330,23 @@ func (m *Machine) processHandlers(e *Event) (Result, bool) {
timeout: false,
}

// reuse the timer each time
m.handlerTimer.Reset(m.HandlerTimeout)
select {
case <-m.ctx.Done():
break
case m.handlerStart <- handlerCall:
}

// reuse the timer each time
m.handlerTimer.Reset(m.HandlerTimeout)

// wait on the result / timeout / context
select {

case <-m.ctx.Done():

case <-m.handlerTimer.C:
// notify the handler loop
// TODO wait for [Event.AcceptTimeout]
m.handlerTimeout <- struct{}{}
m.log(LogOps, "[cancel] (%s) by timeout", j(tx.TargetStates()))
err := fmt.Errorf("%w: %s", ErrHandlerTimeout, methodName)
Expand All @@ -2331,6 +2361,7 @@ func (m *Machine) processHandlers(e *Event) (Result, bool) {

case r := <-m.handlerPanic:
// recover partial state
// TODO pass tx info via &AT{}
m.recoverToErr(h, r)

case ret = <-m.handlerEnd:
Expand All @@ -2344,7 +2375,7 @@ func (m *Machine) processHandlers(e *Event) (Result, bool) {
// tracers
m.tracersLock.RLock()
for i := range m.tracers {
m.tracers[i].HandlerEnd(m.t.Load(), handlerName, methodName)
m.tracers[i].HandlerEnd(tx, handlerName, methodName)
}
m.tracersLock.RUnlock()

Expand Down Expand Up @@ -2402,21 +2433,34 @@ func (m *Machine) handlerLoop() {
return
}
ret := true
retCh := make(chan struct{})
endCh := make(chan struct{})
timeout := atomic.Bool{}

// fork for timeout
go func() {
// confirm the event is still valid
if !call.event.IsValid() {
close(endCh)
return
}
// check timeout
if timeout.Load() {
close(endCh)
return
}

// catch panics and fwd
if m.PanicToException {
defer catch()
}

// handler signature: FooState(e *am.Event)
// TODO optimize https://github.com/golang/go/issues/7818
callRet := call.fn.Call([]reflect.Value{reflect.ValueOf(call.event)})
if len(callRet) > 0 {
ret = callRet[0].Interface().(bool)
}
close(retCh)
close(endCh)
}()

// wait for result / timeout / context
Expand All @@ -2425,8 +2469,9 @@ func (m *Machine) handlerLoop() {
m.handlerLoopDone()
return
case <-m.handlerTimeout:
timeout.Store(true)
continue
case <-retCh:
case <-endCh:
// pass
}

Expand All @@ -2443,6 +2488,7 @@ func (m *Machine) handlerLoop() {
return

case m.handlerEnd <- ret:
// pass
}
}
}
Expand Down Expand Up @@ -2834,6 +2880,10 @@ func (m *Machine) GetStruct() Struct {
return maps.Clone(m.states)
}

func (m *Machine) SchemaVer() uint8 {
return uint8(m.schemaVer.Load())
}

// SetStruct sets the machine's state structure. It will automatically call
// VerifyStates with the names param and handle EventStructChange if successful.
// Note: it's not recommended to change the states structure of a machine which
Expand All @@ -2847,6 +2897,7 @@ func (m *Machine) SetStruct(statesStruct Struct, names S) error {

old := m.states
m.states = parseStruct(statesStruct)
m.schemaVer.Add(1)

err := m.VerifyStates(names)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions pkg/machine/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,29 @@ func (e *Event) Transition() *Transition {
return e.Machine().t.Load()
}

// IsValid confirm this event should still be processed. Useful for negotiation
// handlers, which can't use state context.
func (e *Event) IsValid() bool {
tx := e.Transition()
if tx == nil {
return false
}

return e.TransitionId == tx.ID && !tx.IsCompleted() && tx.Accepted
}

// AcceptTimeout is like IsValid, but requires the handler to stop executing
// after receiving [true].
func (e *Event) AcceptTimeout() bool {
panic("Not implemented")

// TODO notify the handler loop
// if !e.isClosed {
// close(e.done)
// }
}

// Clone clones only the essential data of the Event. Useful for tracing vs GC.
func (e *Event) Clone() *Event {
id := e.MachineId
if e.Machine() == nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/machine/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
// Transition represents processing of a single mutation within a machine.
type Transition struct {
// ID is a unique identifier of the transition.
// TODO refac to Id()
ID string
// Steps is a list of steps taken by this transition (so far).
Steps []*Step
Expand Down Expand Up @@ -55,7 +56,7 @@ func newTransition(m *Machine, item *Mutation) *Transition {
defer m.activeStatesLock.RUnlock()

t := &Transition{
ID: RandId(),
ID: randId(),
Mutation: item,
TimeBefore: m.time(nil),
Machine: m,
Expand Down
11 changes: 9 additions & 2 deletions pkg/machine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type HandlerDispose func(id string, ctx context.Context)
// Opts struct is used to configure a new Machine.
type Opts struct {
// Unique ID of this machine. Default: random ID.
// TODO refac to Id
ID string
// Time for a handler to execute. Default: time.Second
HandlerTimeout time.Duration
Expand All @@ -75,6 +76,7 @@ type Opts struct {
// If true, the machine will die on panics.
DontPanicToException bool
// If true, the machine will NOT prefix its logs with its ID.
// TODO refac to DontLogId
DontLogID bool
// Custom relations resolver. Default: *DefaultRelationsResolver.
Resolver RelationsResolver
Expand All @@ -88,6 +90,7 @@ type Opts struct {
// Overrides ParentID. Default: nil.
Parent Api
// ParentID is the ID of the parent machine. Default: "".
// TODO refac to ParentId
ParentID string
// Tags is a list of tags for the machine. Default: nil.
Tags []string
Expand Down Expand Up @@ -133,7 +136,7 @@ type Api interface {

// ///// LOCAL

// Checking (local)
// Checking (local)o

IsErr() bool
Is(states S) bool
Expand Down Expand Up @@ -234,14 +237,18 @@ const (
var (
// ErrStateUnknown indicates that the state is unknown.
ErrStateUnknown = errors.New("state unknown")
// ErrStateInactive indicates that a neccessary state isn't active.
ErrStateInactive = errors.New("state not active")
// ErrCanceled can be used to indicate a canceled Transition.
ErrCanceled = errors.New("transition canceled")
// ErrQueued can be used to indicate a queued Transition.
ErrQueued = errors.New("transition queued")
// ErrInvalidArgs can be used to indicate invalid arguments.
ErrInvalidArgs = errors.New("invalid arguments")
// ErrHandlerTimeout can be used to indicate timed out mutation.
// ErrHandlerTimeout sindicate a timed out mutation.
ErrHandlerTimeout = errors.New("handler timeout")
// ErrHandlerTimeout sindicate a timed out eval func.
ErrEvalTimeout = errors.New("eval timeout")
// ErrTimeout can be used to indicate a timeout.
ErrTimeout = errors.New("timeout")
// ErrStateMissing is used to indicate missing states.
Expand Down
Loading

0 comments on commit b0b34fb

Please sign in to comment.