77//!
88
99extern crate variance;
10- extern crate crossbeam ;
10+ extern crate crossbeam_channel ;
1111
1212#[ macro_use]
1313extern crate scopeguard;
1414
1515use variance:: InvariantLifetime as Id ;
16- use crossbeam :: sync :: MsQueue ;
16+ use crossbeam_channel :: { Sender , Receiver } ;
1717
1818use std:: { thread, mem} ;
1919use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
@@ -128,7 +128,7 @@ impl Pool {
128128 #[ inline]
129129 pub fn shutdown ( & self ) {
130130 // Start the shutdown process.
131- self . inner . queue . push ( PoolMessage :: Quit ) ;
131+ let _ = self . inner . tx . send ( PoolMessage :: Quit ) ;
132132
133133 // Wait for it to complete.
134134 self . wait . join ( )
@@ -165,18 +165,18 @@ impl Pool {
165165 let mut thread_sentinel = ThreadSentinel ( Some ( self . clone ( ) ) ) ;
166166
167167 loop {
168- match self . inner . queue . pop ( ) {
168+ match self . inner . rx . recv ( ) . unwrap ( ) {
169169 // On Quit, repropogate and quit.
170170 PoolMessage :: Quit => {
171171 // Repropogate the Quit message to other threads.
172- self . inner . queue . push ( PoolMessage :: Quit ) ;
172+ let _ = self . inner . tx . send ( PoolMessage :: Quit ) ;
173173
174174 // Cancel the thread sentinel so we don't panic waiting
175175 // shutdown threads, and don't restart the thread.
176176 thread_sentinel. cancel ( ) ;
177177
178178 // Terminate the thread.
179- break
179+ break ;
180180 } ,
181181
182182 // On Task, run the task then complete the WaitGroup.
@@ -191,21 +191,27 @@ impl Pool {
191191}
192192
193193struct PoolInner {
194- queue : MsQueue < PoolMessage > ,
194+ tx : Sender < PoolMessage > ,
195+ rx : Receiver < PoolMessage > ,
195196 thread_config : ThreadConfig ,
196- thread_counter : AtomicUsize
197+ thread_counter : AtomicUsize ,
197198}
198199
199200impl PoolInner {
200201 fn with_thread_config ( thread_config : ThreadConfig ) -> Self {
201- PoolInner { thread_config : thread_config, ..Self :: default ( ) }
202+ PoolInner {
203+ thread_config,
204+ ..Default :: default ( )
205+ }
202206 }
203207}
204208
205209impl Default for PoolInner {
206210 fn default ( ) -> Self {
211+ let ( tx, rx) = crossbeam_channel:: unbounded ( ) ;
207212 PoolInner {
208- queue : MsQueue :: new ( ) ,
213+ tx,
214+ rx,
209215 thread_config : ThreadConfig :: default ( ) ,
210216 thread_counter : AtomicUsize :: new ( 1 )
211217 }
@@ -279,7 +285,7 @@ impl<'scope> Scope<'scope> {
279285 #[ inline]
280286 pub fn forever ( pool : Pool ) -> Scope < ' static > {
281287 Scope {
282- pool : pool ,
288+ pool,
283289 wait : Arc :: new ( WaitGroup :: new ( ) ) ,
284290 _scope : Id :: default ( )
285291 }
@@ -301,7 +307,7 @@ impl<'scope> Scope<'scope> {
301307 } ;
302308
303309 // Submit the task to be executed.
304- self . pool . inner . queue . push ( PoolMessage :: Task ( task, self . wait . clone ( ) ) ) ;
310+ let _ = self . pool . inner . tx . send ( PoolMessage :: Task ( task, self . wait . clone ( ) ) ) ;
305311 }
306312
307313 /// Add a job to this scope which itself will get access to the scope.
@@ -322,7 +328,7 @@ impl<'scope> Scope<'scope> {
322328 pub fn zoom < ' smaller , F , R > ( & self , scheduler : F ) -> R
323329 where F : FnOnce ( & Scope < ' smaller > ) -> R ,
324330 ' scope : ' smaller {
325- let scope = unsafe { self . refine :: < ' smaller > ( ) } ;
331+ let scope = unsafe { self . refine ( ) } ;
326332
327333 // Join the scope either on completion of the scheduler or panic.
328334 defer ! ( scope. join( ) ) ;
0 commit comments