Skip to content

Commit 31482d5

Browse files
committed
[work-pool] Add work-pool subproject
* This is an extraction from my mutant-manager project.
1 parent e4ec728 commit 31482d5

File tree

9 files changed

+580
-0
lines changed

9 files changed

+580
-0
lines changed

stack-9.4.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ flags:
7171
development: true
7272
tasty-mgolden:
7373
development: true
74+
work-pool:
75+
development: true
7476
xray:
7577
development: true
7678
packages:
@@ -97,4 +99,5 @@ packages:
9799
- source-constraints
98100
- stack-deploy
99101
- tasty-mgolden
102+
- work-pool
100103
- xray

stack-9.6.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ flags:
6363
development: true
6464
tasty-mgolden:
6565
development: true
66+
work-pool:
67+
development: true
6668
xray:
6769
development: true
6870
packages:
@@ -89,4 +91,5 @@ packages:
8991
- source-constraints
9092
- stack-deploy
9193
- tasty-mgolden
94+
- work-pool
9295
- xray

work-pool/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# work-pool
2+
3+
Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production.

work-pool/package.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
_common/package: !include "../common/package.yaml"
2+
3+
name: work-pool
4+
synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot.
5+
homepage: https://github.com/mbj/mhs#readme
6+
github: mbj/mhs
7+
version: 0.0.1
8+
9+
<<: *defaults
10+
11+
dependencies:
12+
- base
13+
- mprelude
14+
- text
15+
- unliftio
16+
17+
tests:
18+
test:
19+
<<: *test
20+
dependencies:
21+
- containers
22+
- devtools
23+
- tasty
24+
- tasty-hunit
25+
- work-pool

work-pool/src/WorkPool.hs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
module WorkPool
2+
( Config(..)
3+
, Pool
4+
, pushJob
5+
, runPool
6+
)
7+
where
8+
9+
import MPrelude
10+
import Prelude (succ)
11+
12+
import qualified UnliftIO.Async as UnliftIO
13+
import qualified UnliftIO.STM as UnliftIO
14+
15+
-- | Worker pool configuration
16+
data Config a = Config
17+
{ produceJobs :: forall m . MonadUnliftIO m => Pool a -> m ()
18+
-- ^ function called from the main thread producing work, use `pushJob` to
19+
-- create workable jobs.
20+
, queueSize :: Natural
21+
-- ^ maximum size of the jobs queued
22+
, workerCount :: Natural
23+
-- ^ number of workers to boot
24+
, workerRun :: forall m . MonadUnliftIO m => Natural -> m (a -> m ())
25+
-- ^ function called when a worker is booted, argument is the worker index,
26+
-- returns an action to be called per job assigned to this worker.
27+
}
28+
29+
-- Internal queue event, supplying job or quitting the worker
30+
data Event a = Quit | Job a
31+
32+
-- | Running pool
33+
newtype Pool a = Pool
34+
{ queue :: UnliftIO.TBQueue (Event a)
35+
}
36+
37+
-- | Add (dynamically) created a job to the pool
38+
--
39+
-- This function will block if the max queue size would be overflown.
40+
-- As the workers create space in the queue this function will unblock.
41+
pushJob :: MonadIO m => Pool a -> a -> m ()
42+
pushJob Pool{..} item
43+
= UnliftIO.atomically
44+
$ UnliftIO.writeTBQueue queue (Job item)
45+
46+
-- | Run worker pool with specified config
47+
--
48+
-- The function will return if either:
49+
-- * the `produceJobs` function returns
50+
-- * a worker or throws an error
51+
-- * the producer throws an error.
52+
--
53+
-- Care is taken to not leak threads via the use if `withAsync` from the `async` package.
54+
runPool :: forall a m . MonadUnliftIO m => Config a -> m ()
55+
runPool Config{..} = do
56+
pool@Pool{..} <- UnliftIO.atomically $ do
57+
queue <- UnliftIO.newTBQueue queueSize
58+
pure Pool{..}
59+
60+
boot pool $ \handlers -> do
61+
produceJobs pool -- supply jobs
62+
UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit -- signal workers to gracefully exit
63+
traverse_ UnliftIO.wait handlers -- wait for workers to gracefully exit
64+
where
65+
boot :: Pool a -> ([UnliftIO.Async ()] -> m ()) -> m ()
66+
boot Pool{..} withAsyncHandlers = go 0 []
67+
where
68+
go :: Natural -> [UnliftIO.Async ()] -> m ()
69+
go index asyncHandlers =
70+
if index == workerCount
71+
then withAsyncHandlers asyncHandlers
72+
else
73+
UnliftIO.withAsync
74+
(workLoop =<< workerRun index)
75+
(\async -> go (succ index) (async:asyncHandlers))
76+
77+
workLoop action =
78+
pullJob >>= maybe (pure ()) (\item -> action item >> workLoop action)
79+
80+
pullJob
81+
= UnliftIO.atomically
82+
$ UnliftIO.readTBQueue queue >>= \case
83+
(Job item) -> pure $ pure item
84+
Quit -> UnliftIO.writeTBQueue queue Quit $> empty

work-pool/test/Test.hs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import Control.Arrow (left)
2+
import Control.Monad (when)
3+
import MPrelude
4+
import Test.Tasty
5+
import Test.Tasty.HUnit
6+
7+
import qualified Data.List as List
8+
import qualified Data.Set as Set
9+
import qualified Data.String as String
10+
import qualified Devtools
11+
import qualified UnliftIO.Concurrent as UnliftIO
12+
import qualified UnliftIO.Exception as UnliftIO
13+
import qualified WorkPool
14+
15+
main :: IO ()
16+
main
17+
= defaultMain
18+
$ testGroup "work-pool"
19+
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"])
20+
, mkSuccess 1 1
21+
, mkSuccess 1 100
22+
, mkSuccess 100 1
23+
, mkSuccess 100 100
24+
, mkSuccess 1000 1000
25+
, producerFailure
26+
, workerFailure
27+
]
28+
where
29+
mkSuccess :: Natural -> Natural -> TestTree
30+
mkSuccess queueSize workerCount =
31+
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do
32+
output <- UnliftIO.newMVar []
33+
WorkPool.runPool $ config output
34+
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output
35+
where
36+
config output = WorkPool.Config{..}
37+
where
38+
workerRun :: MonadUnliftIO m => Natural -> m (Natural -> m ())
39+
workerRun _index = pure $ \value -> do
40+
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ())
41+
42+
workerFailure :: TestTree
43+
workerFailure = testCase "worker failure" $ do
44+
result <- UnliftIO.try (WorkPool.runPool config)
45+
assertEqual "" (Left "intentional error\n") (left formatException result)
46+
where
47+
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..}
48+
49+
workerRun :: MonadIO m => Natural -> m (Natural -> m ())
50+
workerRun _index = pure $ \value ->
51+
when (value == 100) $ UnliftIO.throwString "intentional error"
52+
53+
producerFailure :: TestTree
54+
producerFailure = testCase "producer failure" $ do
55+
result <- UnliftIO.try (WorkPool.runPool config)
56+
assertEqual "" (Left "intentional error\n") (left formatException result)
57+
where
58+
config
59+
= WorkPool.Config
60+
{ produceJobs = produceJobsFailing
61+
, queueSize = 100
62+
, workerCount = 100
63+
, ..
64+
}
65+
66+
produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m ()
67+
produceJobsFailing pool = do
68+
WorkPool.pushJob pool 1
69+
UnliftIO.throwString "intentional error"
70+
71+
workerRun :: MonadIO m => Natural -> m (Natural -> m ())
72+
workerRun _index = pure . const $ pure ()
73+
74+
formatException :: UnliftIO.SomeException -> String
75+
formatException
76+
= String.unlines
77+
. List.drop 2
78+
. List.take 3
79+
. String.lines
80+
. UnliftIO.displayException
81+
82+
produceJobs :: MonadIO m => WorkPool.Pool Natural -> m ()
83+
produceJobs pool = traverse_ (WorkPool.pushJob pool) values
84+
85+
values :: [Natural]
86+
values = [0..1000]
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
Diff 0.4.1
2+
OneTuple 0.4.1.1
3+
QuickCheck 2.14.3
4+
StateVar 1.2.2
5+
aeson 2.1.2.1
6+
alex 3.3.0.0
7+
ansi-terminal 0.11.5
8+
ansi-terminal-types 0.11.5
9+
ansi-wl-pprint 0.6.9
10+
array 0.5.4.0
11+
assoc 1.1
12+
async 2.2.5
13+
attoparsec 0.14.4
14+
base 4.17.2.1
15+
base-compat 0.12.3
16+
base-compat-batteries 0.12.3
17+
base-orphans 0.9.1
18+
bifunctors 5.5.15
19+
binary 0.8.9.1
20+
bitvec 1.1.5.0
21+
bytestring 0.11.5.3
22+
call-stack 0.4.0
23+
clock 0.8.4
24+
cmdargs 0.10.22
25+
colour 2.3.6
26+
comonad 5.0.8
27+
conduit 1.3.5
28+
containers 0.6.7
29+
contravariant 1.5.5
30+
cpphs 1.20.9.1
31+
data-default 0.7.1.1
32+
data-default-class 0.1.2.0
33+
data-default-instances-containers 0.0.1
34+
data-default-instances-dlist 0.0.1
35+
data-default-instances-old-locale 0.0.1
36+
data-fix 0.3.2
37+
deepseq 1.4.8.0
38+
deriving-aeson 0.2.9
39+
devtools 0.2.0
40+
directory 1.3.7.1
41+
distributive 0.6.2.1
42+
dlist 1.0
43+
exceptions 0.10.5
44+
extra 1.7.14
45+
file-embed 0.0.15.0
46+
filepath 1.4.2.2
47+
filepattern 0.1.3
48+
foldable1-classes-compat 0.1
49+
generically 0.1.1
50+
ghc 9.4.8
51+
ghc-bignum 1.3
52+
ghc-boot 9.4.8
53+
ghc-boot-th 9.4.8
54+
ghc-heap 9.4.8
55+
ghc-lib-parser 9.4.8.20231111
56+
ghc-lib-parser-ex 9.4.0.0
57+
ghc-prim 0.9.1
58+
ghci 9.4.8
59+
happy 1.20.1.1
60+
hashable 1.4.3.0
61+
hlint 3.5
62+
hpc 0.6.1.0
63+
hscolour 1.24.4
64+
indexed-traversable 0.1.3
65+
indexed-traversable-instances 0.1.1.2
66+
integer-logarithms 1.0.3.1
67+
libyaml 0.1.2
68+
mono-traversable 1.0.15.3
69+
mprelude 0.2.3
70+
mtl 2.2.2
71+
old-locale 1.0.0.7
72+
optparse-applicative 0.17.1.0
73+
parsec 3.1.16.1
74+
polyparse 1.13
75+
pretty 1.1.3.6
76+
primitive 0.8.0.0
77+
process 1.6.18.0
78+
random 1.2.1.1
79+
refact 0.3.0.2
80+
resourcet 1.2.6
81+
rts 1.0.2
82+
safe-exceptions 0.1.7.4
83+
scientific 0.3.7.0
84+
semialign 1.3
85+
semigroupoids 5.3.7
86+
source-constraints 0.0.5
87+
split 0.2.3.5
88+
splitmix 0.1.0.5
89+
stm 2.5.1.0
90+
strict 0.5
91+
syb 0.7.2.4
92+
tagged 0.8.7
93+
tasty 1.4.3
94+
tasty-expected-failure 0.12.3
95+
tasty-hunit 0.10.1
96+
tasty-mgolden 0.0.2
97+
template-haskell 2.19.0.0
98+
terminfo 0.4.1.5
99+
text 2.0.2
100+
text-short 0.1.5
101+
th-abstraction 0.4.5.0
102+
th-lift 0.8.4
103+
these 1.2
104+
time 1.12.2
105+
time-compat 1.9.6.1
106+
transformers 0.5.6.2
107+
transformers-compat 0.7.2
108+
typed-process 0.2.11.1
109+
unbounded-delays 0.1.1.1
110+
uniplate 1.6.13
111+
unix 2.7.3
112+
unliftio 0.2.25.0
113+
unliftio-core 0.2.1.0
114+
unordered-containers 0.2.19.1
115+
utf8-string 1.0.2
116+
uuid-types 1.0.5.1
117+
vector 0.13.1.0
118+
vector-algorithms 0.9.0.1
119+
vector-stream 0.1.0.0
120+
witherable 0.4.2
121+
work-pool 0.0.1
122+
yaml 0.11.11.2

0 commit comments

Comments
 (0)