Skip to content

WIP: Concurrent tasks #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions devbox.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
"test": [
"cd tests/",
"./run-tests.sh"
],
"integration": [
"cd integration_tests",
"./run-tests.sh"
]
}
}
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/run-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set -e

gren make src/Main.gren
node app
10 changes: 8 additions & 2 deletions integration_tests/src/Main.gren
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ module Main exposing ( .. )
{-|-}

import Test.Crypto as Crypto
import Test.Runner.Effectful
import Test.Task as Task
import Test.Runner.Effectful exposing (concat)
import Node


Expand All @@ -13,5 +14,10 @@ import Node
main : Test.Runner.Effectful.Program a
main =
Node.defineSimpleProgram (\env ->
Test.Runner.Effectful.run env Crypto.tests
Test.Runner.Effectful.run env
(concat
[ Crypto.tests
, Task.tests
]
)
)
193 changes: 193 additions & 0 deletions integration_tests/src/Test/Task.gren
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
module Test.Task exposing (tests)

{-| -}

import Array exposing (..)
import Basics exposing (..)
import Bytes exposing (Bytes)
import Bytes.Decode
import Crypto
import Expect exposing (Expectation)
import Math
import Maybe exposing (Maybe(..))
import Process
import Task exposing (Task)
import Test.Runner.Effectful exposing (await, awaitError, concat, describe, fuzz, run, test)
import Time


{-| -}
tests : Test.Runner.Effectful.Test
tests =
-- map2
-- runs concurrently
-- combines results
-- preserves order
-- fails if one fails
-- nesting runs concurrently
describe "Task.concurrent"
[ let
durations : Array Float
durations =
Array.range 1 1000 |> Array.map toFloat
in
await
(durations
|> Array.map sleep
|> Task.concurrent
|> withDuration
)
"concurrent array of tasks"
(\res ->
concat
[ test "preserves order of results"
(\_ -> Expect.equal durations res.val)
, test "runs tasks concurrently"
(\_ -> expectDurationLessThan 2000 res)
]
)
, await
(Array.repeat 1000000 (sleep 10)
|> Task.concurrent
|> withDuration
)
"large concurrent array of tasks"
(\res ->
test "completes in reasonable time"
(\_ -> expectDurationLessThan 2000 res)
)
, awaitError
(Task.concurrent
[ Task.concurrent
[ sleep 100
, sleep 50
]
, Task.concurrent
[ sleep 1000 |> Task.andThen (\_ -> Task.fail "slow boom")
, sleep 100
]
, Task.concurrent
[ sleep 50 |> Task.andThen (\_ -> Task.fail "fast boom")
, sleep 100
]
]
)
"nested concurrent task with a failure"
(\err ->
test "fails the whole task with the error from the first failed task"
(\_ -> Expect.equal "fast boom" err)
)
, let
nested : Array (Task x (Array a)) -> Task x (Array a)
nested =
Task.concurrent >> Task.map Array.flatten
in
await
(nested
[ Task.concurrent
[ sleep 170
, sleep 160
]
, nested
[ Task.concurrent
[ sleep 180
, sleep 190
]
, nested
[ Task.concurrent
[ sleep 200
, sleep 200
]
, nested
[ Task.concurrent
[ sleep 199
, sleep 198
]
]
]
]
, Task.concurrent
[ sleep 195
, sleep 180
]
]
|> withDuration
)
"deeply nested concurrent task"
(\res ->
concat
[ test "runs deeply nested concurrent tasks at the same time"
(\_ -> expectDurationLessThan 300 res)
, test "preserves result order"
(\_ -> Expect.equal [ 170, 160, 180, 190, 200, 200, 199, 198, 195, 180 ] res.val)
]
)
]


sleep : Float -> Task x Float
sleep ms =
Process.sleep ms |> Task.map (\_ -> ms)



-- Timing helpers


type alias Timed a =
{ start : Time.Posix
, end : Time.Posix
, val : a
}


expectDurationLessThan : Int -> Timed a -> Expectation
expectDurationLessThan ms a =
if duration a < ms then
Expect.pass

else
Expect.fail
("Duration was: "
++ String.fromInt (duration a)
++ ", Expected less than: "
++ String.fromInt ms
++ "ms"
)


duration : Timed a -> Int
duration timed =
Time.posixToMillis timed.end - Time.posixToMillis timed.start


withDuration : Task x a -> Task (Timed x) (Timed a)
withDuration task =
Time.now
|> Task.andThen
(\start ->
task
|> Task.onError
(\e ->
Time.now
|> Task.andThen
(\end ->
Task.fail
{ start = start
, end = end
, val = e
}
)
)
|> Task.andThen
(\a ->
Time.now
|> Task.map
(\end ->
{ start = start
, end = end
, val = a
}
)
)
)
67 changes: 59 additions & 8 deletions src/Gren/Kernel/Scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,47 @@ function _Scheduler_receive(callback) {
};
}

function _Scheduler_concurrent(tasks) {
if (tasks.length === 0) return _Scheduler_succeed([]);

return _Scheduler_binding(function (callback) {
let count = 0;
let results = [];
let procs = [];

function killAll() {
procs.forEach(_Scheduler_rawKill);
}

function onError(e) {
killAll();
callback(_Scheduler_fail(e));
}

procs = tasks.map((task, i) => {
function onSuccess(res) {
results[i] = res;
count++;
if (count === tasks.length) {
callback(_Scheduler_succeed(results));
}
}
const success = A2(_Scheduler_andThen, onSuccess, task);
const handled = A2(_Scheduler_onError, onError, success);
return _Scheduler_rawSpawn(handled)
});

return killAll;
});
}

var _Scheduler_map2 = F3(function (callback, taskA, taskB) {
function combine([resA, resB]) {
return _Scheduler_succeed(A2(callback, resA, resB));
}
return A2(_Scheduler_andThen, combine, _Scheduler_concurrent([taskA, taskB]));
});

// PROCESSES

var _Scheduler_guid = 0;
Expand Down Expand Up @@ -87,17 +128,21 @@ var _Scheduler_send = F2(function (proc, msg) {

function _Scheduler_kill(proc) {
return _Scheduler_binding(function (callback) {
var task = proc.__root;
if (task && task.$ === __1_BINDING && task.__kill) {
task.__kill();
}

proc.__root = null;
_Scheduler_rawKill(proc);

callback(_Scheduler_succeed({}));
});
}

function _Scheduler_rawKill(proc) {
var task = proc.__root;
if (task && task.$ === __1_BINDING && task.__kill) {
task.__kill();
}

proc.__root = null;
}

/* STEP PROCESSES

type alias Process =
Expand All @@ -119,8 +164,14 @@ function _Scheduler_enqueue(proc) {
return;
}
_Scheduler_working = true;
while ((proc = _Scheduler_queue.shift())) {
_Scheduler_step(proc);
// Make sure tasks created during _step are run
while (_Scheduler_queue.length > 0) {
const activeProcs = _Scheduler_queue;
_Scheduler_queue = [];

for (const proc of activeProcs) {
_Scheduler_step(proc);
}
}
_Scheduler_working = false;
}
Expand Down
Loading