diff --git a/devbox.json b/devbox.json index 1e98b0b..aa18f20 100644 --- a/devbox.json +++ b/devbox.json @@ -15,6 +15,10 @@ "test": [ "cd tests/", "./run-tests.sh" + ], + "integration": [ + "cd integration_tests", + "./run-tests.sh" ] } } diff --git a/integration_tests/run-tests.sh b/integration_tests/run-tests.sh new file mode 100755 index 0000000..e1697f7 --- /dev/null +++ b/integration_tests/run-tests.sh @@ -0,0 +1,4 @@ +set -e + +gren make src/Main.gren +node app diff --git a/integration_tests/src/Main.gren b/integration_tests/src/Main.gren index 1ba311e..f3becf5 100644 --- a/integration_tests/src/Main.gren +++ b/integration_tests/src/Main.gren @@ -4,7 +4,8 @@ module Main exposing ( .. ) {-|-} import Test.Crypto as Crypto -import Test.Runner.Effectful +import Test.Task as Task +import Test.Runner.Effectful exposing (concat) import Node @@ -13,5 +14,10 @@ import Node main : Test.Runner.Effectful.Program a main = Node.defineSimpleProgram (\env -> - Test.Runner.Effectful.run env Crypto.tests + Test.Runner.Effectful.run env + (concat + [ Crypto.tests + , Task.tests + ] + ) ) diff --git a/integration_tests/src/Test/Task.gren b/integration_tests/src/Test/Task.gren new file mode 100644 index 0000000..ee14942 --- /dev/null +++ b/integration_tests/src/Test/Task.gren @@ -0,0 +1,193 @@ +module Test.Task exposing (tests) + +{-| -} + +import Array exposing (..) +import Basics exposing (..) +import Bytes exposing (Bytes) +import Bytes.Decode +import Crypto +import Expect exposing (Expectation) +import Math +import Maybe exposing (Maybe(..)) +import Process +import Task exposing (Task) +import Test.Runner.Effectful exposing (await, awaitError, concat, describe, fuzz, run, test) +import Time + + +{-| -} +tests : Test.Runner.Effectful.Test +tests = + -- map2 + -- runs concurrently + -- combines results + -- preserves order + -- fails if one fails + -- nesting runs concurrently + describe "Task.concurrent" + [ let + durations : Array Float + durations = + Array.range 1 1000 |> Array.map toFloat + in + await + (durations + |> Array.map sleep + |> Task.concurrent + |> withDuration + ) + "concurrent array of tasks" + (\res -> + concat + [ test "preserves order of results" + (\_ -> Expect.equal durations res.val) + , test "runs tasks concurrently" + (\_ -> expectDurationLessThan 2000 res) + ] + ) + , await + (Array.repeat 1000000 (sleep 10) + |> Task.concurrent + |> withDuration + ) + "large concurrent array of tasks" + (\res -> + test "completes in reasonable time" + (\_ -> expectDurationLessThan 2000 res) + ) + , awaitError + (Task.concurrent + [ Task.concurrent + [ sleep 100 + , sleep 50 + ] + , Task.concurrent + [ sleep 1000 |> Task.andThen (\_ -> Task.fail "slow boom") + , sleep 100 + ] + , Task.concurrent + [ sleep 50 |> Task.andThen (\_ -> Task.fail "fast boom") + , sleep 100 + ] + ] + ) + "nested concurrent task with a failure" + (\err -> + test "fails the whole task with the error from the first failed task" + (\_ -> Expect.equal "fast boom" err) + ) + , let + nested : Array (Task x (Array a)) -> Task x (Array a) + nested = + Task.concurrent >> Task.map Array.flatten + in + await + (nested + [ Task.concurrent + [ sleep 170 + , sleep 160 + ] + , nested + [ Task.concurrent + [ sleep 180 + , sleep 190 + ] + , nested + [ Task.concurrent + [ sleep 200 + , sleep 200 + ] + , nested + [ Task.concurrent + [ sleep 199 + , sleep 198 + ] + ] + ] + ] + , Task.concurrent + [ sleep 195 + , sleep 180 + ] + ] + |> withDuration + ) + "deeply nested concurrent task" + (\res -> + concat + [ test "runs deeply nested concurrent tasks at the same time" + (\_ -> expectDurationLessThan 300 res) + , test "preserves result order" + (\_ -> Expect.equal [ 170, 160, 180, 190, 200, 200, 199, 198, 195, 180 ] res.val) + ] + ) + ] + + +sleep : Float -> Task x Float +sleep ms = + Process.sleep ms |> Task.map (\_ -> ms) + + + +-- Timing helpers + + +type alias Timed a = + { start : Time.Posix + , end : Time.Posix + , val : a + } + + +expectDurationLessThan : Int -> Timed a -> Expectation +expectDurationLessThan ms a = + if duration a < ms then + Expect.pass + + else + Expect.fail + ("Duration was: " + ++ String.fromInt (duration a) + ++ ", Expected less than: " + ++ String.fromInt ms + ++ "ms" + ) + + +duration : Timed a -> Int +duration timed = + Time.posixToMillis timed.end - Time.posixToMillis timed.start + + +withDuration : Task x a -> Task (Timed x) (Timed a) +withDuration task = + Time.now + |> Task.andThen + (\start -> + task + |> Task.onError + (\e -> + Time.now + |> Task.andThen + (\end -> + Task.fail + { start = start + , end = end + , val = e + } + ) + ) + |> Task.andThen + (\a -> + Time.now + |> Task.map + (\end -> + { start = start + , end = end + , val = a + } + ) + ) + ) diff --git a/src/Gren/Kernel/Scheduler.js b/src/Gren/Kernel/Scheduler.js index 994ee6a..feb15ae 100644 --- a/src/Gren/Kernel/Scheduler.js +++ b/src/Gren/Kernel/Scheduler.js @@ -49,6 +49,47 @@ function _Scheduler_receive(callback) { }; } +function _Scheduler_concurrent(tasks) { + if (tasks.length === 0) return _Scheduler_succeed([]); + + return _Scheduler_binding(function (callback) { + let count = 0; + let results = []; + let procs = []; + + function killAll() { + procs.forEach(_Scheduler_rawKill); + } + + function onError(e) { + killAll(); + callback(_Scheduler_fail(e)); + } + + procs = tasks.map((task, i) => { + function onSuccess(res) { + results[i] = res; + count++; + if (count === tasks.length) { + callback(_Scheduler_succeed(results)); + } + } + const success = A2(_Scheduler_andThen, onSuccess, task); + const handled = A2(_Scheduler_onError, onError, success); + return _Scheduler_rawSpawn(handled) + }); + + return killAll; + }); +} + +var _Scheduler_map2 = F3(function (callback, taskA, taskB) { + function combine([resA, resB]) { + return _Scheduler_succeed(A2(callback, resA, resB)); + } + return A2(_Scheduler_andThen, combine, _Scheduler_concurrent([taskA, taskB])); +}); + // PROCESSES var _Scheduler_guid = 0; @@ -87,17 +128,21 @@ var _Scheduler_send = F2(function (proc, msg) { function _Scheduler_kill(proc) { return _Scheduler_binding(function (callback) { - var task = proc.__root; - if (task && task.$ === __1_BINDING && task.__kill) { - task.__kill(); - } - - proc.__root = null; + _Scheduler_rawKill(proc); callback(_Scheduler_succeed({})); }); } +function _Scheduler_rawKill(proc) { + var task = proc.__root; + if (task && task.$ === __1_BINDING && task.__kill) { + task.__kill(); + } + + proc.__root = null; +} + /* STEP PROCESSES type alias Process = @@ -119,8 +164,14 @@ function _Scheduler_enqueue(proc) { return; } _Scheduler_working = true; - while ((proc = _Scheduler_queue.shift())) { - _Scheduler_step(proc); + // Make sure tasks created during _step are run + while (_Scheduler_queue.length > 0) { + const activeProcs = _Scheduler_queue; + _Scheduler_queue = []; + + for (const proc of activeProcs) { + _Scheduler_step(proc); + } } _Scheduler_working = false; } diff --git a/src/Task.gren b/src/Task.gren index 0f595d6..61e1e78 100644 --- a/src/Task.gren +++ b/src/Task.gren @@ -1,7 +1,7 @@ effect module Task where { command = MyCmd } exposing ( Task, perform, attempt, execute - , andThen, await, succeed, fail, sequence - , map, map2, map3, map4, map5 + , andThen, await, succeed, fail, sequence, concurrent + , map, andMap, map2, map3 , onError, mapError ) @@ -14,12 +14,12 @@ HTTP requests or writing to a database. ## Chains -@docs andThen, await, succeed, fail, sequence +@docs andThen, await, succeed, fail, sequence, concurrent ## Maps -@docs map, map2, map3, map4, map5 +@docs map, andMap, map2, map3 ## Errors @@ -125,6 +125,32 @@ map func taskA = taskA |> andThen (\a -> succeed (func a)) +{-| Apply the results of two tasks together if they succeed. Both tasks are run concurrently, and if either task fails the whole task fails: + + succeed ((+) 2) + |> andMap (succeed 3) + --> succeed 5 + + fail "oh dear" + |> andMap (succeed 3) + --> fail "oh dear" + +This can be used to do `Task.mapN` for any number of arguments: + + succeed (\a b c d -> a + b + c + d) + |> andMap (succeed 1) + |> andMap (succeed 2) + |> andMap (succeed 3) + |> andMap (succeed 4) + -- succeed 10 + +**NOTE**: for running an array of tasks at the same time see [`concurrent`](#concurrent) + +-} +andMap : Task x a -> Task x (a -> b) -> Task x b +andMap = + map2 (|>) + {-| Put the results of two tasks together. For example, if we wanted to know the current month, we could use [`Time`][time] to ask: @@ -137,78 +163,25 @@ the current month, we could use [`Time`][time] to ask: getMonth = Task.map2 Time.toMonth Time.here Time.now -**Note:** Say we were doing HTTP requests instead. `map2` does each task in -order, so it would try the first request and only continue after it succeeds. -If it fails, the whole thing fails! +**Note:** Say we were doing HTTP requests instead. `map2` starts both requests concurrently +and if either of them fails, the whole thing fails! +If one request fails and the other is still in-flight, the in-flight task will be automatically cancelled and cleaned up. [time]: Time -} map2 : (a -> b -> result) -> Task x a -> Task x b -> Task x result -map2 func taskA taskB = - taskA - |> andThen - (\a -> - taskB - |> andThen (\b -> succeed (func a b)) - ) +map2 = + Gren.Kernel.Scheduler.map2 {-| -} map3 : (a -> b -> c -> result) -> Task x a -> Task x b -> Task x c -> Task x result map3 func taskA taskB taskC = - taskA - |> andThen - (\a -> - taskB - |> andThen - (\b -> - taskC - |> andThen (\c -> succeed (func a b c)) - ) - ) - - -{-| -} -map4 : (a -> b -> c -> d -> result) -> Task x a -> Task x b -> Task x c -> Task x d -> Task x result -map4 func taskA taskB taskC taskD = - taskA - |> andThen - (\a -> - taskB - |> andThen - (\b -> - taskC - |> andThen - (\c -> - taskD - |> andThen (\d -> succeed (func a b c d)) - ) - ) - ) - - -{-| -} -map5 : (a -> b -> c -> d -> e -> result) -> Task x a -> Task x b -> Task x c -> Task x d -> Task x e -> Task x result -map5 func taskA taskB taskC taskD taskE = - taskA - |> andThen - (\a -> - taskB - |> andThen - (\b -> - taskC - |> andThen - (\c -> - taskD - |> andThen - (\d -> - taskE - |> andThen (\e -> succeed (func a b c d e)) - ) - ) - ) - ) + succeed func + |> andMap taskA + |> andMap taskB + |> andMap taskC {-| Start with an array of tasks, and turn them into a single task that returns a @@ -219,8 +192,21 @@ sequence fails. -} sequence : Array (Task x a) -> Task x (Array a) -sequence tasks = - Array.foldr (map2 Array.pushFirst) (succeed []) tasks +sequence = + Array.foldr (\task combined -> task |> andThen (\x -> map (Array.pushFirst x) combined)) (succeed []) + + +{-| Start with an array of tasks, and turn them into a single task that returns a +array. The tasks will be run concurrently and if any task fails the whole array fails. + + concurrent [ succeed 1, succeed 2 ] == succeed [ 1, 2 ] + +Additionally if any task fails, any tasks already in-flight will be cleaned up (e.g. an in-flgiht http request will be cancelled). + +-} +concurrent : Array (Task x a) -> Task x (Array a) +concurrent = + Gren.Kernel.Scheduler.concurrent