diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 836dfd5e..44bc86ae 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,5 +25,8 @@ jobs: - name: Verify formatting run: devbox run format:check - - name: Run tests + - name: Run unit tests run: devbox run test + + - name: Run integration tests + run: devbox run integration diff --git a/.gitignore b/.gitignore index bbfcf602..ac418bc3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .gren tests/app +integration_tests/app tests/.gren *.dat doc*.json diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 0622d164..3c92d029 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -10,3 +10,4 @@ Justin Blake Jeroen Engels Joey Bright Axel Baudot +Andrew MacMurray diff --git a/devbox.json b/devbox.json index 1e98b0b2..aa18f20c 100644 --- a/devbox.json +++ b/devbox.json @@ -15,6 +15,10 @@ "test": [ "cd tests/", "./run-tests.sh" + ], + "integration": [ + "cd integration_tests", + "./run-tests.sh" ] } } diff --git a/integration_tests/run-tests.sh b/integration_tests/run-tests.sh new file mode 100755 index 00000000..e1697f7f --- /dev/null +++ b/integration_tests/run-tests.sh @@ -0,0 +1,4 @@ +set -e + +gren make src/Main.gren +node app diff --git a/integration_tests/src/Main.gren b/integration_tests/src/Main.gren index 1ba311e0..f3becf52 100644 --- a/integration_tests/src/Main.gren +++ b/integration_tests/src/Main.gren @@ -4,7 +4,8 @@ module Main exposing ( .. ) {-|-} import Test.Crypto as Crypto -import Test.Runner.Effectful +import Test.Task as Task +import Test.Runner.Effectful exposing (concat) import Node @@ -13,5 +14,10 @@ import Node main : Test.Runner.Effectful.Program a main = Node.defineSimpleProgram (\env -> - Test.Runner.Effectful.run env Crypto.tests + Test.Runner.Effectful.run env + (concat + [ Crypto.tests + , Task.tests + ] + ) ) diff --git a/integration_tests/src/Test/Task.gren b/integration_tests/src/Test/Task.gren new file mode 100644 index 00000000..1930714d --- /dev/null +++ b/integration_tests/src/Test/Task.gren @@ -0,0 +1,321 @@ +module Test.Task exposing (tests) + +import Array exposing (..) +import Basics exposing (..) +import Bytes exposing (Bytes) +import Bytes.Decode +import Crypto +import Expect exposing (Expectation) +import Math +import Maybe exposing (Maybe(..)) +import Process +import Task exposing (Task) +import Test.Runner.Effectful exposing (await, awaitError, concat, describe, fuzz, run, test) +import Time + + +{-| `Process.sleep` but returns the number of millis taken instead of {}. + + This is a convenient way of labelling different `Process.sleep`s for tests. + +-} +sleep : Float -> Task x Float +sleep ms = + Process.sleep ms |> Task.map (\_ -> ms) + + +{-| Use in assertions to allow some startup time. +-} +leeway : number +leeway = + 800 + + +{-| Tests for core gren `Task`. +-} +tests : Test.Runner.Effectful.Test +tests = + describe "Task" + [ describe "Task.concurrent" + [ let + durations : Array Float + durations = + Array.range 1 1000 |> Array.map toFloat + in + await + (durations + |> Array.map sleep + |> Task.concurrent + |> withDuration + ) + "concurrent array of tasks" + (\res -> + concat + [ test "preserves order of results" + (\_ -> Expect.equal durations res.value) + , test "runs tasks concurrently" + (\_ -> duration res |> Expect.lessThan (1000 + leeway)) + ] + ) + , await + (Array.repeat 1000000 (sleep 10) + |> Task.concurrent + |> withDuration + ) + "large concurrent array of tasks" + (\res -> + test "completes in reasonable time" + (\_ -> duration res |> Expect.lessThan (10 + leeway)) + ) + , awaitError + (Task.concurrent + [ Task.concurrent + [ sleep 10000 + , sleep 10000 + ] + , Task.concurrent + [ sleep 5000 |> Task.andThen (\_ -> Task.fail "slow boom") + , sleep 500 + ] + , Task.concurrent + [ sleep 1000 |> Task.andThen (\_ -> Task.fail "fast boom") + , sleep 5000 + ] + ] + |> withDuration + ) + "concurrent task with deeply nested failure" + (\res -> + concat + [ test "fails the whole task with the error from the first failed task" + (\_ -> Expect.equal "fast boom" res.value) + , test "does not wait for other tasks to finish" + (\_ -> duration res |> Expect.lessThan (1000 + leeway)) + ] + ) + , awaitError + (Task.concurrent + [ Task.fail "immediate boom" + , sleep 10000 + , sleep 10000 + , sleep 10000 + ] + |> withDuration + ) + "concurrent task with top level failure" + (\res -> + concat + [ test "fails with correct error" + (\_ -> Expect.equal "immediate boom" res.value) + , test "fails immediately" + (\_ -> duration res |> Expect.lessThan leeway) + ] + ) + , let + nested : Array (Task x (Array a)) -> Task x (Array a) + nested = + Task.concurrent >> Task.map Array.flatten + in + await + (nested + [ Task.concurrent + [ sleep 900 + , sleep 800 + ] + , nested + [ Task.concurrent + [ sleep 900 + , sleep 950 + ] + , nested + [ Task.concurrent + [ sleep 980 + , sleep 990 + ] + , nested + [ Task.concurrent + [ sleep 850 + , sleep 880 + ] + ] + ] + ] + , Task.concurrent + [ sleep 990 + , sleep 980 + ] + ] + |> withDuration + ) + "deeply nested concurrent task" + (\res -> + concat + [ test "runs deeply nested concurrent tasks at the same time" + (\_ -> duration res |> Expect.lessThan (1000 + leeway)) + , test "preserves result order" + (\_ -> + Expect.equal [ 900, 800, 900, 950, 980, 990, 850, 880, 990, 980 ] res.value + ) + ] + ) + , let + tasks : Array (Task x Float) + tasks = + Array.repeat 1000 (sleep 1) + in + await + (Task.map2 + (\sequence concurrent -> + { sequence = sequence + , concurrent = concurrent + } + ) + (Task.sequence tasks |> withDuration) + (Task.concurrent tasks |> withDuration) + ) + "sequence vs concurrent" + (\res -> + concat + [ test "concurrent is faster than sequence" + (\_ -> + (duration res.sequence - duration res.concurrent) + |> Expect.greaterThan 1000 + ) + , test "return the same results" + (\_ -> + Expect.equal res.concurrent.value res.sequence.value + ) + ] + ) + ] + , describe "Task.map2" + [ await + (Task.map2 (++) + (sleep 3 |> Task.map Array.singleton) + (sleep 2 |> Task.map Array.singleton) + ) + "two tasks" + (\res -> + test "combines two results and preserves ordering" + (\_ -> Expect.equal [ 3, 2 ] res) + ) + , await + (Task.map2 (+) + (Task.map2 (+) + (sleep 1000) + (Task.map2 (+) + (sleep 1000) + (Task.map2 (+) + (sleep 1000) + (sleep 1000) + ) + ) + ) + (Task.map2 (+) + (sleep 1000) + (Task.map2 (+) + (sleep 1000) + (sleep 1000) + ) + ) + |> withDuration + ) + "deeply nested tasks" + (\res -> + concat + [ test "combines multiple results" + (\_ -> Expect.equal 7000 res.value) + , test "runs tasks concurrently" + (\_ -> duration res |> Expect.lessThan (1000 + leeway)) + ] + ) + , awaitError + (Task.map2 (+) + (Task.map2 (+) + (sleep 10000) + (sleep 10000) + ) + (Task.map2 (+) + (sleep 10000 |> Task.andThen (\_ -> Task.fail "slow boom")) + (Task.map2 (+) + (sleep 10000) + (sleep 1000 |> Task.andThen (\_ -> Task.fail "fast boom")) + ) + ) + |> withDuration + ) + "deeply nested task with errors" + (\res -> + concat + [ test "fails the whole task with the error from the first failed task" + (\_ -> Expect.equal "fast boom" res.value) + , test "does not wait for other tasks to finish" + (\_ -> duration res |> Expect.lessThan (1000 + leeway)) + ] + ) + , awaitError + (Task.map2 (+) + (Task.fail "immediate boom") + (Task.map2 (+) + (sleep 10000 |> Task.andThen (\_ -> Task.fail "slow boom")) + (Task.map2 (+) + (sleep 10000) + (sleep 1000 |> Task.andThen (\_ -> Task.fail "fast boom")) + ) + ) + |> withDuration + ) + "deeply nested task with immediate top level error" + (\res -> + concat + [ test "fails with correct error" + (\_ -> Expect.equal "immediate boom" res.value) + , test "fails immediately" + (\_ -> duration res |> Expect.lessThan leeway) + ] + ) + ] + ] + + + +-- Task duration helpers + + +type alias Duration a = + { start : Time.Posix + , end : Time.Posix + , value : a + } + + +duration : Duration a -> Int +duration d = + Time.posixToMillis d.end - Time.posixToMillis d.start + + +withDuration : Task x a -> Task (Duration x) (Duration a) +withDuration task = + Task.await Time.now <| + \start -> + task + |> Task.onError + (\x -> + Task.await Time.now <| + \end -> + Task.fail + { start = start + , end = end + , value = x + } + ) + |> Task.andThen + (\a -> + Task.await Time.now <| + \end -> + Task.succeed + { start = start + , end = end + , value = a + } + ) diff --git a/src/Gren/Kernel/Scheduler.js b/src/Gren/Kernel/Scheduler.js index 994ee6a0..2d12e9de 100644 --- a/src/Gren/Kernel/Scheduler.js +++ b/src/Gren/Kernel/Scheduler.js @@ -49,6 +49,47 @@ function _Scheduler_receive(callback) { }; } +function _Scheduler_concurrent(tasks) { + if (tasks.length === 0) return _Scheduler_succeed([]); + + return _Scheduler_binding(function (callback) { + let count = 0; + let results = new Array(tasks.length); + let procs; + + function killAll() { + procs.forEach(_Scheduler_rawKill); + } + + function onError(e) { + killAll(); + callback(_Scheduler_fail(e)); + } + + procs = tasks.map((task, i) => { + function onSuccess(res) { + results[i] = res; + count++; + if (count === tasks.length) { + callback(_Scheduler_succeed(results)); + } + } + const success = A2(_Scheduler_andThen, onSuccess, task); + const handled = A2(_Scheduler_onError, onError, success); + return _Scheduler_rawSpawn(handled); + }); + + return killAll; + }); +} + +var _Scheduler_map2 = F3(function (callback, taskA, taskB) { + function combine([resA, resB]) { + return _Scheduler_succeed(A2(callback, resA, resB)); + } + return A2(_Scheduler_andThen, combine, _Scheduler_concurrent([taskA, taskB])); +}); + // PROCESSES var _Scheduler_guid = 0; @@ -87,17 +128,21 @@ var _Scheduler_send = F2(function (proc, msg) { function _Scheduler_kill(proc) { return _Scheduler_binding(function (callback) { - var task = proc.__root; - if (task && task.$ === __1_BINDING && task.__kill) { - task.__kill(); - } - - proc.__root = null; + _Scheduler_rawKill(proc); callback(_Scheduler_succeed({})); }); } +function _Scheduler_rawKill(proc) { + var task = proc.__root; + if (task && task.$ === __1_BINDING && task.__kill) { + task.__kill(); + } + + proc.__root = null; +} + /* STEP PROCESSES type alias Process = @@ -119,8 +164,14 @@ function _Scheduler_enqueue(proc) { return; } _Scheduler_working = true; - while ((proc = _Scheduler_queue.shift())) { - _Scheduler_step(proc); + // Make sure tasks created during _step are run + while (_Scheduler_queue.length > 0) { + const activeProcs = _Scheduler_queue; + _Scheduler_queue = []; + + for (const proc of activeProcs) { + _Scheduler_step(proc); + } } _Scheduler_working = false; } diff --git a/src/Task.gren b/src/Task.gren index 0f595d68..3dd223b7 100644 --- a/src/Task.gren +++ b/src/Task.gren @@ -1,7 +1,7 @@ effect module Task where { command = MyCmd } exposing ( Task, perform, attempt, execute - , andThen, await, succeed, fail, sequence - , map, map2, map3, map4, map5 + , andThen, await, succeed, fail, sequence, concurrent + , map, map2, map3, andMap , onError, mapError ) @@ -14,12 +14,12 @@ HTTP requests or writing to a database. ## Chains -@docs andThen, await, succeed, fail, sequence +@docs andThen, await, succeed, fail, sequence, concurrent ## Maps -@docs map, map2, map3, map4, map5 +@docs map, map2, map3, andMap ## Errors @@ -137,78 +137,53 @@ the current month, we could use [`Time`][time] to ask: getMonth = Task.map2 Time.toMonth Time.here Time.now -**Note:** Say we were doing HTTP requests instead. `map2` does each task in -order, so it would try the first request and only continue after it succeeds. -If it fails, the whole thing fails! +**Note:** Say we were doing HTTP requests instead. `map2` starts both requests concurrently +and if either of them fails, the whole thing fails! +If one request fails and the other is still in-flight, the in-flight task will be automatically cancelled and cleaned up. [time]: Time -} map2 : (a -> b -> result) -> Task x a -> Task x b -> Task x result -map2 func taskA taskB = - taskA - |> andThen - (\a -> - taskB - |> andThen (\b -> succeed (func a b)) - ) +map2 = + Gren.Kernel.Scheduler.map2 {-| -} map3 : (a -> b -> c -> result) -> Task x a -> Task x b -> Task x c -> Task x result map3 func taskA taskB taskC = - taskA - |> andThen - (\a -> - taskB - |> andThen - (\b -> - taskC - |> andThen (\c -> succeed (func a b c)) - ) - ) + succeed func + |> andMap taskA + |> andMap taskB + |> andMap taskC -{-| -} -map4 : (a -> b -> c -> d -> result) -> Task x a -> Task x b -> Task x c -> Task x d -> Task x result -map4 func taskA taskB taskC taskD = - taskA - |> andThen - (\a -> - taskB - |> andThen - (\b -> - taskC - |> andThen - (\c -> - taskD - |> andThen (\d -> succeed (func a b c d)) - ) - ) - ) +{-| Apply the results of two tasks together if they succeed. Both tasks are run concurrently, and if either task fails the whole task fails: + succeed ((+) 2) + |> andMap (succeed 3) + --> succeed 5 + + succeed ((+) 2) + |> andMap (fail "oh dear") + --> fail "oh dear" + +This can be used to do `Task.mapN` for any number of arguments - useful when `map2` or `map3` isn't enough: + + succeed (\a b c d -> a + b + c + d) + |> andMap (succeed 1) + |> andMap (succeed 2) + |> andMap (succeed 3) + |> andMap (succeed 4) + -- succeed 10 + +**NOTE**: for running an array of tasks at the same time see [`concurrent`](#concurrent). + +-} +andMap : Task x a -> Task x (a -> b) -> Task x b +andMap = + map2 (|>) -{-| -} -map5 : (a -> b -> c -> d -> e -> result) -> Task x a -> Task x b -> Task x c -> Task x d -> Task x e -> Task x result -map5 func taskA taskB taskC taskD taskE = - taskA - |> andThen - (\a -> - taskB - |> andThen - (\b -> - taskC - |> andThen - (\c -> - taskD - |> andThen - (\d -> - taskE - |> andThen (\e -> succeed (func a b c d e)) - ) - ) - ) - ) {-| Start with an array of tasks, and turn them into a single task that returns a @@ -219,8 +194,34 @@ sequence fails. -} sequence : Array (Task x a) -> Task x (Array a) -sequence tasks = - Array.foldr (map2 Array.pushFirst) (succeed []) tasks +sequence = + Array.foldr (\task combined -> task |> andThen (\x -> map (Array.pushFirst x) combined)) (succeed []) + + +{-| Start with an array of tasks, and turn them into a single task that returns a +array. The tasks will be run concurrently and if any task fails the whole array fails. + + concurrent [ succeed 1, succeed 2 ] == succeed [ 1, 2 ] + +Additionally if any task fails, any tasks already in-flight will be cleaned up (e.g. an in-flgiht http request will be cancelled). + +**Note**: Why use `sequence` over `concurrent`? Maybe you have an expensive operation, like making 100 http requests. +You might want to do these sequentially rather than all at once to avoid overwhelming the server. + +**A note on concurrency and parallelism**: + +Why is there only `concurrent` and not `parallel`? Because JavaScript is single threaded, it's not possible currently to have true parallelism in gren. + +- A parallel task : multiple subtasks can be run at exactly the same time on multiple processors or CPU cores. +- A concurrent task : multiple subtasks can be in progress at the same time but not executing at exactly the same time. + +In practice this means CPU bound tasks (e.g. generate a cryptographic hash) won't see a speed boost from `concurrent`. +IO bound tasks however (e.g. make a http request and wait until the response comes back) may get a noticable speed boost from `concurrent`. + +-} +concurrent : Array (Task x a) -> Task x (Array a) +concurrent = + Gren.Kernel.Scheduler.concurrent