Skip to content

Concurrent tasks #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ jobs:
- name: Verify formatting
run: devbox run format:check

- name: Run tests
- name: Run unit tests
run: devbox run test

- name: Run integration tests
run: devbox run integration
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.gren
tests/app
integration_tests/app
tests/.gren
*.dat
doc*.json
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Justin Blake
Jeroen Engels
Joey Bright
Axel Baudot
Andrew MacMurray
4 changes: 4 additions & 0 deletions devbox.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
"test": [
"cd tests/",
"./run-tests.sh"
],
"integration": [
"cd integration_tests",
"./run-tests.sh"
]
}
}
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/run-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set -e

gren make src/Main.gren
node app
10 changes: 8 additions & 2 deletions integration_tests/src/Main.gren
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ module Main exposing ( .. )
{-|-}

import Test.Crypto as Crypto
import Test.Runner.Effectful
import Test.Task as Task
import Test.Runner.Effectful exposing (concat)
import Node


Expand All @@ -13,5 +14,10 @@ import Node
main : Test.Runner.Effectful.Program a
main =
Node.defineSimpleProgram (\env ->
Test.Runner.Effectful.run env Crypto.tests
Test.Runner.Effectful.run env
(concat
[ Crypto.tests
, Task.tests
]
)
)
321 changes: 321 additions & 0 deletions integration_tests/src/Test/Task.gren
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
module Test.Task exposing (tests)

import Array exposing (..)
import Basics exposing (..)
import Bytes exposing (Bytes)
import Bytes.Decode
import Crypto
import Expect exposing (Expectation)
import Math
import Maybe exposing (Maybe(..))
import Process
import Task exposing (Task)
import Test.Runner.Effectful exposing (await, awaitError, concat, describe, fuzz, run, test)
import Time


{-| `Process.sleep` but returns the number of millis taken instead of {}.

This is a convenient way of labelling different `Process.sleep`s for tests.

-}
sleep : Float -> Task x Float
sleep ms =
Process.sleep ms |> Task.map (\_ -> ms)


{-| Use in assertions to allow some startup time.
-}
leeway : number
leeway =
800


{-| Tests for core gren `Task`.
-}
tests : Test.Runner.Effectful.Test
tests =
describe "Task"
[ describe "Task.concurrent"
[ let
durations : Array Float
durations =
Array.range 1 1000 |> Array.map toFloat
in
await
(durations
|> Array.map sleep
|> Task.concurrent
|> withDuration
)
"concurrent array of tasks"
(\res ->
concat
[ test "preserves order of results"
(\_ -> Expect.equal durations res.value)
, test "runs tasks concurrently"
(\_ -> duration res |> Expect.lessThan (1000 + leeway))
]
)
, await
(Array.repeat 1000000 (sleep 10)
|> Task.concurrent
|> withDuration
)
"large concurrent array of tasks"
(\res ->
test "completes in reasonable time"
(\_ -> duration res |> Expect.lessThan (10 + leeway))
)
, awaitError
(Task.concurrent
[ Task.concurrent
[ sleep 10000
, sleep 10000
]
, Task.concurrent
[ sleep 5000 |> Task.andThen (\_ -> Task.fail "slow boom")
, sleep 500
]
, Task.concurrent
[ sleep 1000 |> Task.andThen (\_ -> Task.fail "fast boom")
, sleep 5000
]
]
|> withDuration
)
"concurrent task with deeply nested failure"
(\res ->
concat
[ test "fails the whole task with the error from the first failed task"
(\_ -> Expect.equal "fast boom" res.value)
, test "does not wait for other tasks to finish"
(\_ -> duration res |> Expect.lessThan (1000 + leeway))
]
)
, awaitError
(Task.concurrent
[ Task.fail "immediate boom"
, sleep 10000
, sleep 10000
, sleep 10000
]
|> withDuration
)
"concurrent task with top level failure"
(\res ->
concat
[ test "fails with correct error"
(\_ -> Expect.equal "immediate boom" res.value)
, test "fails immediately"
(\_ -> duration res |> Expect.lessThan leeway)
]
)
, let
nested : Array (Task x (Array a)) -> Task x (Array a)
nested =
Task.concurrent >> Task.map Array.flatten
in
await
(nested
[ Task.concurrent
[ sleep 900
, sleep 800
]
, nested
[ Task.concurrent
[ sleep 900
, sleep 950
]
, nested
[ Task.concurrent
[ sleep 980
, sleep 990
]
, nested
[ Task.concurrent
[ sleep 850
, sleep 880
]
]
]
]
, Task.concurrent
[ sleep 990
, sleep 980
]
]
|> withDuration
)
"deeply nested concurrent task"
(\res ->
concat
[ test "runs deeply nested concurrent tasks at the same time"
(\_ -> duration res |> Expect.lessThan (1000 + leeway))
, test "preserves result order"
(\_ ->
Expect.equal [ 900, 800, 900, 950, 980, 990, 850, 880, 990, 980 ] res.value
)
]
)
, let
tasks : Array (Task x Float)
tasks =
Array.repeat 1000 (sleep 1)
in
await
(Task.map2
(\sequence concurrent ->
{ sequence = sequence
, concurrent = concurrent
}
)
(Task.sequence tasks |> withDuration)
(Task.concurrent tasks |> withDuration)
)
"sequence vs concurrent"
(\res ->
concat
[ test "concurrent is faster than sequence"
(\_ ->
(duration res.sequence - duration res.concurrent)
|> Expect.greaterThan 1000
)
, test "return the same results"
(\_ ->
Expect.equal res.concurrent.value res.sequence.value
)
]
)
]
, describe "Task.map2"
[ await
(Task.map2 (++)
(sleep 3 |> Task.map Array.singleton)
(sleep 2 |> Task.map Array.singleton)
)
"two tasks"
(\res ->
test "combines two results and preserves ordering"
(\_ -> Expect.equal [ 3, 2 ] res)
)
, await
(Task.map2 (+)
(Task.map2 (+)
(sleep 1000)
(Task.map2 (+)
(sleep 1000)
(Task.map2 (+)
(sleep 1000)
(sleep 1000)
)
)
)
(Task.map2 (+)
(sleep 1000)
(Task.map2 (+)
(sleep 1000)
(sleep 1000)
)
)
|> withDuration
)
"deeply nested tasks"
(\res ->
concat
[ test "combines multiple results"
(\_ -> Expect.equal 7000 res.value)
, test "runs tasks concurrently"
(\_ -> duration res |> Expect.lessThan (1000 + leeway))
]
)
, awaitError
(Task.map2 (+)
(Task.map2 (+)
(sleep 10000)
(sleep 10000)
)
(Task.map2 (+)
(sleep 10000 |> Task.andThen (\_ -> Task.fail "slow boom"))
(Task.map2 (+)
(sleep 10000)
(sleep 1000 |> Task.andThen (\_ -> Task.fail "fast boom"))
)
)
|> withDuration
)
"deeply nested task with errors"
(\res ->
concat
[ test "fails the whole task with the error from the first failed task"
(\_ -> Expect.equal "fast boom" res.value)
, test "does not wait for other tasks to finish"
(\_ -> duration res |> Expect.lessThan (1000 + leeway))
]
)
, awaitError
(Task.map2 (+)
(Task.fail "immediate boom")
(Task.map2 (+)
(sleep 10000 |> Task.andThen (\_ -> Task.fail "slow boom"))
(Task.map2 (+)
(sleep 10000)
(sleep 1000 |> Task.andThen (\_ -> Task.fail "fast boom"))
)
)
|> withDuration
)
"deeply nested task with immediate top level error"
(\res ->
concat
[ test "fails with correct error"
(\_ -> Expect.equal "immediate boom" res.value)
, test "fails immediately"
(\_ -> duration res |> Expect.lessThan leeway)
]
)
]
]



-- Task duration helpers


type alias Duration a =
{ start : Time.Posix
, end : Time.Posix
, value : a
}


duration : Duration a -> Int
duration d =
Time.posixToMillis d.end - Time.posixToMillis d.start


withDuration : Task x a -> Task (Duration x) (Duration a)
withDuration task =
Task.await Time.now <|
\start ->
task
|> Task.onError
(\x ->
Task.await Time.now <|
\end ->
Task.fail
{ start = start
, end = end
, value = x
}
)
|> Task.andThen
(\a ->
Task.await Time.now <|
\end ->
Task.succeed
{ start = start
, end = end
, value = a
}
)
Loading