Skip to content

Commit 3239364

Browse files
authored
Merge pull request #305 from mbj/add/work-pool
[work-pool] Add work-pool subproject
2 parents e4ec728 + 303f12b commit 3239364

File tree

9 files changed

+618
-0
lines changed

9 files changed

+618
-0
lines changed

stack-9.4.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ flags:
7171
development: true
7272
tasty-mgolden:
7373
development: true
74+
work-pool:
75+
development: true
7476
xray:
7577
development: true
7678
packages:
@@ -97,4 +99,5 @@ packages:
9799
- source-constraints
98100
- stack-deploy
99101
- tasty-mgolden
102+
- work-pool
100103
- xray

stack-9.6.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ flags:
6363
development: true
6464
tasty-mgolden:
6565
development: true
66+
work-pool:
67+
development: true
6668
xray:
6769
development: true
6870
packages:
@@ -89,4 +91,5 @@ packages:
8991
- source-constraints
9092
- stack-deploy
9193
- tasty-mgolden
94+
- work-pool
9295
- xray

work-pool/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# work-pool
2+
3+
Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production.

work-pool/package.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
_common/package: !include "../common/package.yaml"
2+
3+
name: work-pool
4+
synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot.
5+
homepage: https://github.com/mbj/mhs#readme
6+
github: mbj/mhs
7+
version: 0.0.1
8+
9+
<<: *defaults
10+
11+
dependencies:
12+
- base
13+
- containers
14+
- mprelude
15+
- text
16+
- unliftio
17+
18+
tests:
19+
test:
20+
<<: *test
21+
dependencies:
22+
- devtools
23+
- tasty
24+
- tasty-hunit
25+
- work-pool

work-pool/src/WorkPool.hs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
module WorkPool
2+
( Config(..)
3+
, Done
4+
, Pool
5+
, Source
6+
, pushJob
7+
, readJobCount
8+
, runPool
9+
, runSource
10+
)
11+
where
12+
13+
import Data.Set (Set)
14+
import MPrelude
15+
import Prelude (succ)
16+
17+
import qualified Data.Set as Set
18+
import qualified UnliftIO.Async as UnliftIO
19+
import qualified UnliftIO.STM as UnliftIO
20+
21+
-- | Worker pool configuration
22+
data Config m a = Config
23+
{ produceJobs :: Pool a -> m ()
24+
-- ^ function called to produce work, use `pushJob` to create workable jobs.
25+
-- once this function returns and all jobs are worked off the workers exit.
26+
, queueSize :: Natural
27+
-- ^ maximum size of the jobs queued
28+
, workerCount :: Natural
29+
-- ^ number of workers to boot
30+
, workerRun :: Natural -> Source a -> m Done
31+
-- ^ function called when a worker is booted, argument is the worker index,
32+
-- and a source to be drained with `runSource`
33+
}
34+
35+
-- Internal queue event, supplying job or quitting the worker
36+
data Event a = Quit | Job a
37+
38+
-- | Running pool
39+
newtype Pool a = Pool
40+
{ queue :: UnliftIO.TBQueue (Event a)
41+
}
42+
43+
-- | Source to be drained
44+
newtype Source a = Source
45+
{ queue :: UnliftIO.TBQueue (Event a)
46+
}
47+
48+
-- | Type forcing clients to drain the source via `runSource`
49+
data Done = Done
50+
51+
-- | Add (dynamically) created a job to the pool
52+
--
53+
-- This function will block if the max queue size would be overflown.
54+
-- As the workers create space in the queue this function will unblock.
55+
pushJob :: MonadIO m => Pool a -> a -> m ()
56+
pushJob Pool{..} item
57+
= UnliftIO.atomically
58+
$ UnliftIO.writeTBQueue queue (Job item)
59+
60+
-- | Read the number of jobs in the pool
61+
--
62+
-- This function does not block. Its not guaranteed count is still correct
63+
-- when the function returns
64+
readJobCount :: MonadIO m => Pool a -> m Natural
65+
readJobCount Pool{..}
66+
= UnliftIO.atomically
67+
$ UnliftIO.lengthTBQueue queue
68+
69+
-- | Drain a source
70+
--
71+
-- This function:
72+
-- * is executed 0 or many times per worker.
73+
-- * executes the action as long there are items in the queue.
74+
-- * as long the producer did not exit: blocks if there are no items in the queue
75+
-- * if the producer exits and the queue is empty: returns.
76+
-- * is he only way to produce a `Done` value required by the `Config` api.
77+
runSource :: MonadIO m => (a -> m ()) -> Source a -> m Done
78+
runSource action Source{..} = go $> Done
79+
where
80+
go = UnliftIO.atomically (UnliftIO.readTBQueue queue) >>= \case
81+
(Job item) -> action item >> go
82+
Quit -> UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit
83+
84+
-- | Run worker pool with specified config
85+
--
86+
-- The function will return if either:
87+
-- * the `produceJobs` function returns
88+
-- * a worker or throws an error
89+
-- * the producer throws an error.
90+
--
91+
-- Care is taken to not leak threads via the use if `withAsync` from the `async` package.
92+
runPool :: forall a m . MonadUnliftIO m => Config m a -> m ()
93+
runPool Config{..} = boot =<< UnliftIO.atomically (UnliftIO.newTBQueue queueSize)
94+
where
95+
boot queue = go 0 []
96+
where
97+
go index workerHandlers =
98+
if index == workerCount
99+
then
100+
UnliftIO.withAsync
101+
(runProducer queue)
102+
(\producerHandle -> waitAll (Set.fromList (producerHandle:workerHandlers)))
103+
else
104+
UnliftIO.withAsync
105+
(workerRun index Source{..})
106+
(\async -> go (succ index) (async:workerHandlers))
107+
108+
runProducer queue = do
109+
produceJobs Pool{..}
110+
UnliftIO.atomically (UnliftIO.writeTBQueue queue Quit)
111+
pure Done
112+
113+
waitAll :: Set (UnliftIO.Async Done) -> m ()
114+
waitAll remaining = case Set.toList remaining of
115+
[] -> pure ()
116+
[handler] -> void $ UnliftIO.wait handler
117+
list -> do
118+
(handler, _result) <- UnliftIO.waitAny list
119+
waitAll $ Set.delete handler remaining

work-pool/test/Test.hs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import Control.Arrow (left)
2+
import Control.Monad (when)
3+
import MPrelude
4+
import Test.Tasty
5+
import Test.Tasty.HUnit
6+
7+
import qualified Data.List as List
8+
import qualified Data.Set as Set
9+
import qualified Data.String as String
10+
import qualified Devtools
11+
import qualified UnliftIO.Concurrent as UnliftIO
12+
import qualified UnliftIO.Exception as UnliftIO
13+
import qualified WorkPool
14+
15+
main :: IO ()
16+
main
17+
= defaultMain
18+
$ testGroup "work-pool"
19+
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"])
20+
, mkSuccess 1 1
21+
, mkSuccess 1 100
22+
, mkSuccess 100 1
23+
, mkSuccess 100 100
24+
, mkSuccess 1000 1000
25+
, producerFailure
26+
, workerFailure
27+
]
28+
where
29+
mkSuccess :: Natural -> Natural -> TestTree
30+
mkSuccess queueSize workerCount =
31+
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do
32+
output <- UnliftIO.newMVar []
33+
WorkPool.runPool $ config output
34+
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output
35+
where
36+
config output = WorkPool.Config{..}
37+
where
38+
workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done
39+
workerRun _index = WorkPool.runSource $ \value -> do
40+
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ())
41+
42+
workerFailure :: TestTree
43+
workerFailure = testCase "worker failure" $ do
44+
result <- UnliftIO.try (WorkPool.runPool config)
45+
assertEqual "" (Left "intentional error\n") (left formatException result)
46+
where
47+
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..}
48+
49+
workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done
50+
workerRun _index =
51+
WorkPool.runSource $ \value ->
52+
when (value == 100) $ UnliftIO.throwString "intentional error"
53+
54+
producerFailure :: TestTree
55+
producerFailure = testCase "producer failure" $ do
56+
result <- UnliftIO.try (WorkPool.runPool config)
57+
assertEqual "" (Left "intentional error\n") (left formatException result)
58+
where
59+
config :: WorkPool.Config IO Natural
60+
config
61+
= WorkPool.Config
62+
{ produceJobs = produceJobsFailing
63+
, queueSize = 100
64+
, workerCount = 100
65+
, ..
66+
}
67+
68+
produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m ()
69+
produceJobsFailing pool = do
70+
WorkPool.pushJob pool 1
71+
UnliftIO.throwString "intentional error"
72+
73+
workerRun :: MonadIO m => Natural -> WorkPool.Source Natural -> m WorkPool.Done
74+
workerRun _index = WorkPool.runSource $ const (pure ())
75+
76+
formatException :: UnliftIO.SomeException -> String
77+
formatException
78+
= String.unlines
79+
. List.drop 2
80+
. List.take 3
81+
. String.lines
82+
. UnliftIO.displayException
83+
84+
produceJobs :: MonadIO m => WorkPool.Pool Natural -> m ()
85+
produceJobs pool = traverse_ (WorkPool.pushJob pool) values
86+
87+
values :: [Natural]
88+
values = [0..1000]
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
Diff 0.4.1
2+
OneTuple 0.4.1.1
3+
QuickCheck 2.14.3
4+
StateVar 1.2.2
5+
aeson 2.1.2.1
6+
alex 3.3.0.0
7+
ansi-terminal 0.11.5
8+
ansi-terminal-types 0.11.5
9+
ansi-wl-pprint 0.6.9
10+
array 0.5.4.0
11+
assoc 1.1
12+
async 2.2.5
13+
attoparsec 0.14.4
14+
base 4.17.2.1
15+
base-compat 0.12.3
16+
base-compat-batteries 0.12.3
17+
base-orphans 0.9.1
18+
bifunctors 5.5.15
19+
binary 0.8.9.1
20+
bitvec 1.1.5.0
21+
bytestring 0.11.5.3
22+
call-stack 0.4.0
23+
clock 0.8.4
24+
cmdargs 0.10.22
25+
colour 2.3.6
26+
comonad 5.0.8
27+
conduit 1.3.5
28+
containers 0.6.7
29+
contravariant 1.5.5
30+
cpphs 1.20.9.1
31+
data-default 0.7.1.1
32+
data-default-class 0.1.2.0
33+
data-default-instances-containers 0.0.1
34+
data-default-instances-dlist 0.0.1
35+
data-default-instances-old-locale 0.0.1
36+
data-fix 0.3.2
37+
deepseq 1.4.8.0
38+
deriving-aeson 0.2.9
39+
devtools 0.2.0
40+
directory 1.3.7.1
41+
distributive 0.6.2.1
42+
dlist 1.0
43+
exceptions 0.10.5
44+
extra 1.7.14
45+
file-embed 0.0.15.0
46+
filepath 1.4.2.2
47+
filepattern 0.1.3
48+
foldable1-classes-compat 0.1
49+
generically 0.1.1
50+
ghc 9.4.8
51+
ghc-bignum 1.3
52+
ghc-boot 9.4.8
53+
ghc-boot-th 9.4.8
54+
ghc-heap 9.4.8
55+
ghc-lib-parser 9.4.8.20231111
56+
ghc-lib-parser-ex 9.4.0.0
57+
ghc-prim 0.9.1
58+
ghci 9.4.8
59+
happy 1.20.1.1
60+
hashable 1.4.3.0
61+
hlint 3.5
62+
hpc 0.6.1.0
63+
hscolour 1.24.4
64+
indexed-traversable 0.1.3
65+
indexed-traversable-instances 0.1.1.2
66+
integer-logarithms 1.0.3.1
67+
libyaml 0.1.2
68+
mono-traversable 1.0.15.3
69+
mprelude 0.2.3
70+
mtl 2.2.2
71+
old-locale 1.0.0.7
72+
optparse-applicative 0.17.1.0
73+
parsec 3.1.16.1
74+
polyparse 1.13
75+
pretty 1.1.3.6
76+
primitive 0.8.0.0
77+
process 1.6.18.0
78+
random 1.2.1.1
79+
refact 0.3.0.2
80+
resourcet 1.2.6
81+
rts 1.0.2
82+
safe-exceptions 0.1.7.4
83+
scientific 0.3.7.0
84+
semialign 1.3
85+
semigroupoids 5.3.7
86+
source-constraints 0.0.5
87+
split 0.2.3.5
88+
splitmix 0.1.0.5
89+
stm 2.5.1.0
90+
strict 0.5
91+
syb 0.7.2.4
92+
tagged 0.8.7
93+
tasty 1.4.3
94+
tasty-expected-failure 0.12.3
95+
tasty-hunit 0.10.1
96+
tasty-mgolden 0.0.2
97+
template-haskell 2.19.0.0
98+
terminfo 0.4.1.5
99+
text 2.0.2
100+
text-short 0.1.5
101+
th-abstraction 0.4.5.0
102+
th-lift 0.8.4
103+
these 1.2
104+
time 1.12.2
105+
time-compat 1.9.6.1
106+
transformers 0.5.6.2
107+
transformers-compat 0.7.2
108+
typed-process 0.2.11.1
109+
unbounded-delays 0.1.1.1
110+
uniplate 1.6.13
111+
unix 2.7.3
112+
unliftio 0.2.25.0
113+
unliftio-core 0.2.1.0
114+
unordered-containers 0.2.19.1
115+
utf8-string 1.0.2
116+
uuid-types 1.0.5.1
117+
vector 0.13.1.0
118+
vector-algorithms 0.9.0.1
119+
vector-stream 0.1.0.0
120+
witherable 0.4.2
121+
work-pool 0.0.1
122+
yaml 0.11.11.2

0 commit comments

Comments
 (0)