Skip to content

Commit

Permalink
feat: special cases (#59)
Browse files Browse the repository at this point in the history
- **feat: support persistent trigger pattern**
- **feat: support transient trigger pattern**
- **feat: support transition without token consumed**
- **feat: support transition without token produced**
- **feat: calibrate the state for the complete_e transition**
  • Loading branch information
fahchen authored Jan 3, 2025
1 parent 36fe457 commit 56c1493
Show file tree
Hide file tree
Showing 10 changed files with 796 additions and 78 deletions.
22 changes: 10 additions & 12 deletions lib/coloured_flow/runner/enactment/enactment.ex
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,15 @@ defmodule ColouredFlow.Runner.Enactment do
calibration = WorkitemCalibration.calibrate(state, transition, options)
state = apply_calibration(calibration)

case transition do
:complete ->
cpnet = Storage.get_flow_by_enactment(state.enactment_id)
# try to terminate when the transition is `:complete`
case check_termination(state, cpnet) do
:stop -> {:stop, :normal, state}
:cont -> {:noreply, state}
end

_other ->
{:noreply, state}
if transition in [:complete, :complete_e] do
cpnet = Storage.get_flow_by_enactment(state.enactment_id)
# try to terminate when the transition is `:complete` or `:complete_e`
case check_termination(state, cpnet) do
:stop -> {:stop, :normal, state}
:cont -> {:noreply, state}
end
else
{:noreply, state}
end
end

Expand Down Expand Up @@ -319,7 +317,7 @@ defmodule ColouredFlow.Runner.Enactment do
:ok,
{
{completed_workitems, new_state},
{:continue, {:calibrate_workitems, :complete, calibration_options}}
{:continue, {:calibrate_workitems, transition_action, calibration_options}}
},
%{workitems: completed_workitems}
}
Expand Down
97 changes: 74 additions & 23 deletions lib/coloured_flow/runner/enactment/workitem_calibration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ defmodule ColouredFlow.Runner.Enactment.WorkitemCalibration do

typed_structor enforce: true do
field :state, enactment_state()
field :to_withdraw, [Workitem.t(:withdrawn)], default: []
field :to_produce, MultiSet.t(BindingElement.t()), default: []
field :to_withdraw, [Workitem.t(:enabled | :started)], default: []
field :to_produce, MultiSet.t(BindingElement.t()), default: MultiSet.new()
end

@doc """
Expand Down Expand Up @@ -78,26 +78,68 @@ defmodule ColouredFlow.Runner.Enactment.WorkitemCalibration do
* `transition` : The transition that caused the calibration. See at `ColouredFlow.Runner.Enactment.Workitem.__transitions__/0`
* `options` : The transition specefied options. See below options.
## Start options
## `start` transition options
* `workitems`: The workitems (after the transition) that are affected by the `start` transition.
* `workitems`: The workitems (after the transition, in `started` state) that are affected by the `start` transition.
## Complete options
## `complete` transition options
* `cpnet`: The coloured petri net.
* `workitem_occurrences`: The workitem and occurrence pairs that are appened after the `complete` transition.
## `complete_e` transition options
* `cpnet`: The coloured petri net.
* `workitem_occurrences`: The workitem and occurrence pairs that are appened after the `complete_e` transition.
"""
@spec calibrate(enactment_state(), :start, workitems: [Workitem.t()]) :: t()
@spec calibrate(enactment_state(), :start, workitems: [Workitem.t(:started)]) :: t()
@spec calibrate(enactment_state(), :complete,
cpnet: ColouredPetriNet.t(),
occurrences: [Occurrence.t()]
workitem_occurrences: [{Workitem.t(:completed), Occurrence.t()}]
) :: t()
@spec calibrate(enactment_state(), :complete_e,
cpnet: ColouredPetriNet.t(),
workitem_occurrences: [{Workitem.t(:completed), Occurrence.t()}]
) :: t()
def calibrate(state, transition, options)

def calibrate(%Enactment{} = state, :start, options)
when is_list(options) do
workitems = Keyword.fetch!(options, :workitems)
to_consume_markings = Enum.flat_map(workitems, & &1.binding_element.to_consume)

{state, to_withdraw} = withdraw_workitems(state, to_consume_markings)
struct!(__MODULE__, state: state, to_withdraw: to_withdraw)
end

def calibrate(%Enactment{} = state, :complete, options)
when is_list(options) do
cpnet = Keyword.fetch!(options, :cpnet)
workitem_occurrences = Keyword.fetch!(options, :workitem_occurrences)

{state, to_produce} = produce_workitems(state, workitem_occurrences, cpnet)
struct!(__MODULE__, state: state, to_produce: to_produce)
end

def calibrate(%Enactment{} = state, :complete_e, options)
when is_list(options) do
cpnet = Keyword.fetch!(options, :cpnet)
workitem_occurrences = Keyword.fetch!(options, :workitem_occurrences)

to_consume_markings =
Enum.flat_map(workitem_occurrences, fn {workitem, _} ->
workitem.binding_element.to_consume
end)

{state, to_withdraw} = withdraw_workitems(state, to_consume_markings)
{state, to_produce} = produce_workitems(state, workitem_occurrences, cpnet)

struct!(__MODULE__, state: state, to_withdraw: to_withdraw, to_produce: to_produce)
end

@spec withdraw_workitems(enactment_state(), to_consume_markings :: [Marking.t()]) ::
{enactment_state(), [Workitem.t(:withdrawn)]}
defp withdraw_workitems(%Enactment{} = state, to_consume_markings) do
place_tokens = Map.new(state.markings, fn {place, marking} -> {place, marking.tokens} end)
place_tokens = consume_markings(to_consume_markings, place_tokens)

Expand All @@ -112,17 +154,15 @@ defmodule ColouredFlow.Runner.Enactment.WorkitemCalibration do
true
end)

struct!(
__MODULE__,
state: %Enactment{state | workitems: workitems},
to_withdraw: Map.values(to_withdraw)
)
{%Enactment{state | workitems: workitems}, Map.values(to_withdraw)}
end

def calibrate(%Enactment{} = state, :complete, options)
when is_list(options) do
cpnet = Keyword.fetch!(options, :cpnet)
workitem_occurrences = Keyword.fetch!(options, :workitem_occurrences)
@spec produce_workitems(
enactment_state(),
workitem_occurrences :: [{Workitem.t(:completed), Occurrence.t()}],
ColouredPetriNet.t()
) :: {enactment_state(), MultiSet.t(BindingElement.t())}
defp produce_workitems(%Enactment{} = state, workitem_occurrences, %ColouredPetriNet{} = cpnet) do
completed_workitem_ids = Enum.map(workitem_occurrences, fn {workitem, _} -> workitem.id end)
occurrences = Enum.map(workitem_occurrences, &elem(&1, 1))

Expand All @@ -138,8 +178,17 @@ defmodule ColouredFlow.Runner.Enactment.WorkitemCalibration do
# find affected transitions, and then find the binding elements
binding_elements =
occurrences
|> Stream.flat_map(& &1.to_produce)
|> Enum.map(& &1.place)
|> Enum.flat_map(fn %Occurrence{} = occurrence ->
occurrence.binding_element
|> consume_zero_tokens?()
|> if do
# if the binding element consumes zero tokens, then it should re-check enablement
Stream.map(occurrence.binding_element.to_consume, & &1.place)
else
[]
end
|> Stream.concat(Stream.map(occurrence.to_produce, & &1.place))
end)
|> Utils.list_transitions(cpnet)
|> Enum.flat_map(fn transition ->
Computation.list(transition, cpnet, available_markings)
Expand All @@ -163,11 +212,7 @@ defmodule ColouredFlow.Runner.Enactment.WorkitemCalibration do
Map.drop(workitems, completed_workitem_ids)
end)

struct!(
__MODULE__,
state: state,
to_produce: MultiSet.to_list(to_produce)
)
{state, to_produce}
end

@spec consume_markings(to_consume_markings :: [Marking.t()], place_tokens) :: place_tokens
Expand Down Expand Up @@ -214,6 +259,12 @@ defmodule ColouredFlow.Runner.Enactment.WorkitemCalibration do
markings
end

defp consume_zero_tokens?(%BindingElement{} = binding_element) do
require MultiSet

Enum.all?(binding_element.to_consume, &MultiSet.is_empty(&1.tokens))
end

@spec in_progress_workitems_filter() :: (Workitem.t() -> boolean())
defp in_progress_workitems_filter do
in_progress_states = Workitem.__in_progress_states__()
Expand Down
41 changes: 15 additions & 26 deletions lib/coloured_flow/runner/enactment/workitem_consumption.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,8 @@ defmodule ColouredFlow.Runner.Enactment.WorkitemConsumption do
| {:error, {:unsufficient_tokens, Marking.t()}}
def consume_tokens(place_markings, binding_elements)

def consume_tokens(place_markings, []) when is_map(place_markings), do: {:ok, place_markings}

def consume_tokens(place_markings, [%BindingElement{} = binding_element | _rest])
when map_size(place_markings) == 0 do
{:error, {:unsufficient_tokens, binding_element.to_consume}}
end
def consume_tokens(place_markings, []) when is_map(place_markings),
do: {:ok, place_markings}

def consume_tokens(place_markings, binding_elements)
when is_map(place_markings) and is_list(binding_elements) do
Expand All @@ -68,30 +64,23 @@ defmodule ColouredFlow.Runner.Enactment.WorkitemConsumption do
|> Enum.reduce(%{}, fn %Marking{} = marking, acc ->
Map.update(acc, marking.place, marking.tokens, &MultiSet.union(&1, marking.tokens))
end)
|> Enum.reduce_while(place_markings, fn {place, to_consume_tokens}, acc ->
place_marking =
case Map.fetch(acc, place) do
{:ok, marking} ->
marking
|> Enum.reduce_while(place_markings, fn
{place, to_consume_tokens}, acc when is_map_key(acc, place) ->
place_marking = Map.fetch!(acc, place)

:error ->
raise """
The place (#{place}) making is absent in the given place markings.
There may be an issue on markings and workitems in the corresponding enactment state,
we should let it crash and restart the process to resolve it.
"""
end
case MultiSet.safe_difference(place_marking.tokens, to_consume_tokens) do
{:ok, remaining_tokens} when MultiSet.is_empty(remaining_tokens) ->
{:cont, Map.delete(acc, place)}

case MultiSet.safe_difference(place_marking.tokens, to_consume_tokens) do
{:ok, remaining_tokens} when MultiSet.is_empty(remaining_tokens) ->
{:cont, Map.delete(acc, place)}
{:ok, remaining_tokens} ->
{:cont, Map.put(acc, place, %Marking{place_marking | tokens: remaining_tokens})}

{:ok, remaining_tokens} ->
{:cont, Map.put(acc, place, %Marking{place_marking | tokens: remaining_tokens})}
:error ->
{:halt, {:error, {:unsufficient_tokens, place_marking}}}
end

:error ->
{:halt, {:error, {:unsufficient_tokens, place_marking}}}
end
{place, _to_consume_tokens}, _acc ->
{:halt, {:error, {:unsufficient_tokens, %Marking{place: place, tokens: []}}}}
end)
|> case do
{:error, _reason} = error -> error
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
defmodule ColouredFlow.Runner.Enactment.IntegrationTests.TransitionWithoutInputPlacesTest do
use ColouredFlow.RepoCase
use ColouredFlow.RunnerHelpers

alias ColouredFlow.Enactment.BindingElement

setup do
use ColouredFlow.DefinitionHelpers

import ColouredFlow.Notation

# ```mermaid
# flowchart TB
# %% The produce_trigger is always enabled.
#
# %% colset int() :: integer()
#
# t((trigger))
#
# pt[produce_trigger]
#
# pt --{1,1}--> t
# ```
cpnet =
%ColouredPetriNet{
colour_sets: [
colset(int() :: integer())
],
places: [
%Place{name: "trigger", colour_set: :int}
],
transitions: [
build_transition!(name: "produce_trigger")
],
arcs: [
arc(produce_trigger ~> trigger :: "{1, 1}")
]
}

%{cpnet: cpnet}
end

describe "validates definition" do
test "works", %{cpnet: cpnet} do
assert {:ok, _cpnet} =
cpnet
|> ColouredFlow.Builder.build()
|> ColouredFlow.Validators.run()
end
end

describe "enactment" do
setup :setup_flow
setup :setup_enactment
setup :start_enactment

@tag initial_markings: []
test "works", %{enactment: enactment, enactment_server: enactment_server} do
[
%Enactment.Workitem{
state: :enabled,
binding_element: %BindingElement{
transition: "produce_trigger",
binding: [],
to_consume: []
}
} = workitem
] = get_enactment_workitems(enactment_server)

workitem = start_workitem(workitem, enactment_server)

{:ok, _workitems} =
GenServer.call(
enactment_server,
{:complete_workitems, %{workitem.id => []}}
)

wait_enactment_to_stop!(enactment_server)

assert %Schemas.EnactmentLog{
state: :terminated,
termination: %Schemas.EnactmentLog.Termination{
type: :implicit,
message: nil
}
} = Repo.get_by!(Schemas.EnactmentLog, enactment_id: enactment.id)

assert %Schemas.Enactment{
state: :terminated,
final_markings: [
%Marking{place: "trigger", tokens: ~MS[1]}
]
} = Repo.get!(Schemas.Enactment, enactment.id)
end
end
end
Loading

0 comments on commit 56c1493

Please sign in to comment.