Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce FiringMode.Serial #527

Open
gao-artur opened this issue Apr 29, 2023 · 14 comments
Open

Introduce FiringMode.Serial #527

gao-artur opened this issue Apr 29, 2023 · 14 comments

Comments

@gao-artur
Copy link

Currently, stateless supports two FiringMode's: Immediate and Queued. Both of them have their use cases, but unfortunately, neither of them matches my needs.
Immediate will change the state even if another state change is in progress by another thread hence can be used only in a single-threaded environment.
Queued will enqueue the state change and return to the caller immediately, while the enqueued job will be executed by the thread which executes the current state change. Both immediate return and forwarding the work to another thread are unacceptable in my scenario. In addition, it has its own multi-threaded usage problems.
I suggest introducing a new FiringMode.Serial. In this mode, the call will block if there is an active state change and will fire only after it is finished. I'm just starting to learn the source code of the library, but it looks like this can be achieved very easily by introducing a SemaphoreSlim based lock in the InternalFire(Async) methods.

private readonly SemaphoreSlim _firingLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

void InternalFire(TTrigger trigger, params object[] args)
{
    switch (_firingMode)
    {
        ...
        case FiringMode.Serial:
            _firingLock.Wait();
            try
            {
                InternalFireOne(trigger, args);
            }
            finally
            {
                _firingLock.Release();
            }
            break;
        default:
           ...
    }
}

async Task InternalFireAsync(TTrigger trigger, params object[] args)
{
    switch (_firingMode)
    {
        ...
        case FiringMode.Serial:
            await _firingLock.WaitAsync();
            try
            {
                await InternalFireOneAsync(trigger, args);
            }
            finally
            {
                _firingLock.Release();
            }
            break;
        default:
            ...
    }
}

Also, since the lock can be held for a long time, it's good to introduce Fire(Async) overloads that accept CancellationToken.

@leeoades
Copy link
Contributor

I didn't think stateless supported being used in a multithreaded way.

Having multiple threads attempting state changes seems extremely problematic. One thread could be checking a guard whilst another thread changes the state immediately. The first thread then approves the state change and moves to the target state which may not be a permittable transition.

I wouldn't be encouraging users to interact with a state machine in a multithreaded fashion. If I were you, I would be looking to queue the transitions and have them processed sequentially.

@gao-artur
Copy link
Author

You are right, stateless is not thread-safe, and it's documented as such. But the fact the FiringMode.Queued exists suggests it can be used in a multithreaded environment (even though there is a race condition on the _firing field).
The new mode I suggest will execute all Fire methods serially, which means concurrent callers will be blocked until the active transition completes. This is similar to enqueueing transitions with the important difference: the caller won't be released before the transition it initialized is finished (and this is a must requirement in my use case). Hence this scenario is not possible:

One thread could be checking a guard whilst another thread changes the state immediately. The first thread then approves the state change and moves to the target state which may not be a permittable transition.

As a workaround, I'm currently using the SemaphoreSlim lock on the public methods that Fire. But built-in support sounds to me legit and less fragile.

@gao-artur
Copy link
Author

There is a caveat with my proposal: if the entry action of one state Fires internally it will cause a deadlock because SemaphoreSlim doesn't support re-entrance...

@leeoades
Copy link
Contributor

leeoades commented May 1, 2023

I would always wrap anything in a multithreaded environment to de-parallelise as soon as possible (wherever that makes sense) at a boundary so that everything within can be safely single threaded.
I would do the same here. If you must have your threads block and fire triggers sequentially then have them controlled by a semaphore slim in a layer above stateless rather than trying to bring that into the statemachine.

@gao-artur
Copy link
Author

Right, the workaround exists. The problem with any workaround is that it must be remembered every time a change is introduced in the code. A new developer who will maintain the code after me, or even me, must remember to wrap the call with a semaphore. The question is if the built-in support is legitimate. I think it is, but with a different implementation to avoid deadlocks.

@crozone
Copy link
Collaborator

crozone commented May 1, 2023

I think there is some misconception about the way in which the firing modes are designed to work, as well as their intended use-cases. Perhaps this needs to be cleaned up in the documentation.

But the fact the FiringMode.Queued exists suggests it can be used in a multithreaded environment (even though there is a race condition on the _firing field).

Stateless is not designed to handle any concurrency. I'd like to avoid using the term "mulithreaded", because it is confusing when discussing Stateless' async methods. A state machine object is fine to be called from many different threads, so long as the methods are not called concurrently. For example the execution flow might bounce between many different threadpool threads when calling Stateless' async methods, but they're still only being run in a serialized fashion, one after the other.

Back to FiringMode.Queued, it is not designed for concurrency either. Rather, it is specifically intended to support the use-case where you call a Fire() method from within a transition function itself. This enqueues the firing of the trigger until all of the currently executing transition functions have completed. This is often the expected behaviour, which is why FiringMode.Queued is the default.

Consider the following triggers:

stateMachine.Configure(State.StateA)
.Permit(Trigger.StateBTransitionTrigger, State.StateB)
.OnEntryAsync(async () =>
{
    Console.WriteLine("A");
    await stateMachine.FireAsync(Triggers.StateBTransitionTrigger);
    Console.WriteLine("B");
});

stateMachine.Configure(State.StateB)
.OnEntryAsync(async () =>
{
    Console.WriteLine("C");
});

With FiringMode.Immediate, the call to stateMachine.FireAsync(Triggers.StateBTransitionTrigger) will immediately start executing all of the transition functions for State.StateB and then transition the state into StateB during the fire call. Then the fire call will return, and the rest of the lambda will execute. The console would print A, C, B.

With FiringMode.Queued, the call to stateMachine.FireAsync(Triggers.StateBTransitionTrigger) will be delayed until after all of the current state transition functions have completed. The console would print A, B, C.

It sounds like what you want with FiringMode.Serial is actually unrelated to these firing modes - instead, you need some sort of work queue which will serialize the firing of triggers coming in from several concurrent sources. This work queue can then dequeue the trigger events and call the state machine in a serialized fashion.

This is similar to the "do activities" issue here: #77

See my other comment here for an example implementation of a work queue which can serialize the state machine fire events: #77 (comment)

Considering that this usecase keeps coming up again and again, it's worth considering how we can build this directly into Stateless itself, I'm just not sure how the FiringMode could adequately support it given that it probably needs a dedicated worker task to be running to support it.

@gao-artur
Copy link
Author

@crozone, thanks for the explanation of the FiringMode.Queued. I didn't even think in this direction. The documentation around this mode definitely should be improved. But I have to admit I don't have experience with FSM's. Maybe it's something trivial in this world.

Currently, SemaphoreSlim works for me because it preserves the order of waiting tasks, so it's similar to using a firing queue.
I thought about queues but for a different reason. In addition to what I described above, I need to give a higher priority for some transitions (for example, Abort transition should be executed immediately after the active transition, no matter how many are already waiting in the queue).

Continuing the idea of the worker task and priorities when using the new mode (you can call it Serial or whatever sounds more correct), the worker task can be created and start polling tasks from the queue. Any usual transitions should have "Normal" priority, a transition triggered from another transition should have "Highest", and the user should be able to define "High" priority transitions to execute before "Normal" (names are just for example, numbers can be used for more flexibility).

I think this way, thread safety can be built-in into the library while providing additional functionality.

Note that Channel doesn't support priorities, and the new PriorityQueue is not stable (doesn't preserve the order within priority).

@crozone
Copy link
Collaborator

crozone commented Jul 4, 2023

@mclift Do you think we should start a proper design discussion for a FiringMode.Serial, given that we keep getting issues relating to concurrent queuing of triggers? It's definitely a super common usecase.

I think the work items would be:

  1. More clearly document the differences between FiringMode.Immediate and FiringMode.Queued, with an explicit explanation that neither are threadsafe.
  2. Design a FiringMode.Serial / Serialized mode which internally queues triggers in a threadsafe manner.
  3. Put together a draft PR. I now agree with @gao-artur that SemaphoreSlim is likely ideal for this since it handles synchronous and asynchronous synchronization simultaneously, with the difficult code handled by the framework.

@crozone
Copy link
Collaborator

crozone commented Jul 13, 2023

I've been looking into this and considering the use of SemaphoreSlim. While researching the implementation of AsyncLock, I've come up with an alternative that doesn't require the use of SemaphoreSlim at all.

Since we're simply serializing the firing of triggers, we can get away with as something as simple as a ContinueWith() chain:

private object currentTaskLock = new object();
private Task currentTask = Task.CompletedTask;
private Task EnqueueTask(Func<Task> action)
{
    lock (currentTaskLock)
    {
        currentTask = currentTask.IsCompleted
            ? action()
            : currentTask.ContinueWith((t) => action(),
                CancellationToken.None,
                TaskContinuationOptions.ExecuteSynchronously,
                TaskScheduler.Default).Unwrap();

        return currentTask;
    }
}

The basic idea is that any currently executing Fire() task is stored as the currentTask. If a new task is queued while the current task is still in progress, it is simply queued as a continuation of the current task, and the current task is replaced with the continuation task. If the current task has finished, it is instead replaced with the new task to avoid storing a huge chain of completed tasks.

Synchronous Fire() calls can be handled by wrapping them in Tasks and then waiting. SemaphoreSlim might still be a better solution for mixing async and sync Fire() calls together for this reason.

@gao-artur
Copy link
Author

I'm not an expert on ContinueWith methods, so that I may be wrong. But according to the docs, the TaskContinuationOptions.ExecuteSynchronously will run the continuation state change on the same thread as the previous state change. It means the previous state change caller will be unnecessarily blocked for a prolonged time. I think you should use TaskContinuationOptions.RunContinuationsAsynchronously here.

@leeoades
Copy link
Contributor

leeoades commented Jul 13, 2023 via email

@crozone
Copy link
Collaborator

crozone commented Jul 17, 2023

@leeoades I appreciate your concern, stateless has very large consumers that we don't want to subtly break with a change like this.

The use-case for FiringMode.Serial is where a state machine is driven asynchronously by several other tasks which each generate events (eg tasks reading from serial ports, receiving messages on sockets, etc). All of these events are fed into the state machine as triggers which can change application state. This seems to be a common scenario for many smaller embedded style applications where the state machine forms the core of the application logic.

Since the state machine is not threadsafe, the trigger needs to be first put into some sort of work queue (like a Channel<T>) and then processed on a dedicated worker thread. This works nicely, but is additional boilerplate that needs to be re-implemented for a seemingly common use-case.

The state machine already contains an internal queuing mechanism for the case when triggers are fired from within a state transition, which is enabled when the firing mode is set to FiringMode.Queued (the default). If this internal queuing mechanism could be made threadsafe, it can effectively be used as the work queue. The semantics of how this works needs to be carefully considered to avoid deadlocks (eg when firing triggers from within a transition), but it should theoretically work quite well.

This would be implemented as an additional firing mode, FiringModel.Serial, with its own dedicated codepath. FiringMode.Immediate and FiringMode.Queued already have their own codepaths, so the code is already structured this way.

The default firing mode, FiringMode.Queued, would remain as it is (not threadsafe) and should/must not suffer a performance regression as a result of this feature. It should ideally remain almost entirely untouched. We definitely need to ensure that the synchronization context is maintained (I believe we have a unit test for this?).

In any case we are definitely not going to rush into implementing this without plenty of prototyping. It may turn out to not be a good idea.

Ryan.

@leeoades
Copy link
Contributor

Thanks for taking the time to respond, @crozone

If this were an event-driven scenario - where the events come in, adjust the state and cause change events - then I think queuing the state changes would work. This is essentially a read model.

Funnily enough, I built an RX State Machine a looong time ago. In this case, RX itself was responsible for synchronising the calls, and there was no ability to read the state machine. The events coming in only led to events going out.

Any interrogation of the current state by the consumer is unreliable and could lead to unintended behaviour since of course the current state could be behind with queued triggers yet to be processed. The consumer might raise a trigger thinking they were affecting the state in one way but instead they are queuing the trigger and they act against a potentially different state once the trigger is processed.

If the entire interaction with the state machine was atomic - it could interrogate the current state when it was that consumer's turn and triggers fired would occur immediately. Queuing these interactions at this higher level is safer and more intuitive. Synchronise all traffic through not only the state machine but through the wider context.

Why not fork the repo and build your multi-threaded version? I'll be happy to throw some multithreaded unit tests at it.

@gao-artur
Copy link
Author

Any interrogation of the current state by the consumer is unreliable and could lead to unintended behaviour since of course the current state could be behind with queued triggers yet to be processed. The consumer might raise a trigger thinking they were affecting the state in one way but instead they are queuing the trigger and they act against a potentially different state once the trigger is processed.

This is true also today. If the SM is not encapsulated and its state is exposed to the external caller, he can do the same before queuing the trigger and will suffer from the same problems. But doesn't it the whole point of the SM that you don't write code like this?

if( currentState == xxx ) { ... }
else if (currentState == yyy) { ... }

And instead, rely on SM rules to decide what to do with the trigger in the current state? So, this example is just a wrong usage of the SM rather than a problem with SM implementation.
IMO, in proper usage, the SM will always be encapsulated. The only difference is where the trigger serializing process occurs: in the encapsulating code or the SM itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants