27
27
# include < unistd.h>
28
28
#endif
29
29
30
- ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
31
-
32
30
#if ILMTHREAD_THREADING_ENABLED
33
31
# define ENABLE_THREADING
32
+ # if ILMTHREAD_USE_TBB
33
+ # include < oneapi/tbb/task_arena.h>
34
+ using namespace oneapi ;
35
+ # endif
34
36
#endif
35
37
38
+ ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
39
+
36
40
namespace
37
41
{
38
42
@@ -56,6 +60,7 @@ handleProcessTask (Task* task)
56
60
}
57
61
}
58
62
63
+ #ifdef ENABLE_THREADING
59
64
struct DefaultThreadPoolData
60
65
{
61
66
Semaphore _taskSemaphore; // threads wait on this for ready tasks
@@ -81,6 +86,7 @@ struct DefaultThreadPoolData
81
86
_stopping = false ;
82
87
}
83
88
};
89
+ #endif
84
90
85
91
} // namespace
86
92
@@ -95,11 +101,11 @@ struct TaskGroup::Data
95
101
Data (Data&&) = delete ;
96
102
Data& operator = (Data&&) = delete ;
97
103
104
+ void waitForEmpty ();
105
+
98
106
void addTask ();
99
107
void removeTask ();
100
108
101
- void waitForEmpty ();
102
-
103
109
std::atomic<int > numPending;
104
110
std::atomic<int > inFlight;
105
111
Semaphore isEmpty; // used to signal that the taskgroup is empty
@@ -110,10 +116,11 @@ struct ThreadPool::Data
110
116
using ProviderPtr = std::shared_ptr<ThreadPoolProvider>;
111
117
112
118
Data ();
119
+ Data (ThreadPoolProvider *p);
113
120
~Data ();
114
121
Data (const Data&) = delete ;
115
122
Data& operator = (const Data&) = delete ;
116
- Data (Data&&) = delete ;
123
+ Data (Data&&) = default ;
117
124
Data& operator = (Data&&) = delete ;
118
125
119
126
ProviderPtr getProvider () const { return std::atomic_load (&_provider); }
@@ -130,6 +137,63 @@ struct ThreadPool::Data
130
137
namespace
131
138
{
132
139
140
+ #if ILMTHREAD_USE_TBB
141
+ class TBBThreadPoolProvider : public ThreadPoolProvider
142
+ {
143
+ public:
144
+ TBBThreadPoolProvider (int count) { setNumThreads (count); }
145
+ TBBThreadPoolProvider (const TBBThreadPoolProvider&) = delete ;
146
+ TBBThreadPoolProvider&
147
+ operator = (const TBBThreadPoolProvider&) = delete ;
148
+ TBBThreadPoolProvider (TBBThreadPoolProvider&&) = delete ;
149
+ TBBThreadPoolProvider& operator = (TBBThreadPoolProvider&&) = delete ;
150
+ ~TBBThreadPoolProvider () noexcept override
151
+ {
152
+ finish ();
153
+ }
154
+
155
+ int numThreads () const override
156
+ {
157
+ return _arena ? _arena->max_concurrency () : 1 ;
158
+ }
159
+ void setNumThreads (int count) override
160
+ {
161
+ if (_arena)
162
+ _arena->terminate ();
163
+ _arena.reset ();
164
+
165
+ if (count > 1 )
166
+ {
167
+ _arena = std::make_unique<tbb::task_arena> (count);
168
+ _arena->initialize ();
169
+ }
170
+ }
171
+
172
+ void addTask (Task* task) override
173
+ {
174
+ if (_arena)
175
+ {
176
+ _arena->enqueue ([=] ()
177
+ {
178
+ handleProcessTask (task);
179
+ });
180
+ }
181
+ else
182
+ handleProcessTask (task);
183
+ }
184
+
185
+ void finish () override
186
+ {
187
+ if (_arena)
188
+ _arena->terminate ();
189
+ _arena.reset ();
190
+ }
191
+ private:
192
+
193
+ std::unique_ptr<tbb::task_arena> _arena;
194
+ };
195
+ #endif
196
+
133
197
//
134
198
// class DefaultThreadPoolProvider
135
199
//
@@ -331,7 +395,8 @@ DefaultThreadPoolProvider::threadLoop (
331
395
// struct TaskGroup::Data
332
396
//
333
397
334
- TaskGroup::Data::Data () : numPending (0 ), inFlight (0 ), isEmpty (1 )
398
+ TaskGroup::Data::Data ()
399
+ : numPending (0 ), inFlight (0 ), isEmpty (1 )
335
400
{}
336
401
337
402
TaskGroup::Data::~Data ()
@@ -402,6 +467,12 @@ ThreadPool::Data::Data ()
402
467
// empty
403
468
}
404
469
470
+ ThreadPool::Data::Data (ThreadPoolProvider *p)
471
+ : _provider (p)
472
+ {
473
+ // empty
474
+ }
475
+
405
476
ThreadPool::Data::~Data ()
406
477
{
407
478
setProvider (nullptr );
@@ -485,6 +556,17 @@ ThreadPool::ThreadPool (unsigned nthreads)
485
556
#endif
486
557
}
487
558
559
+ // private constructor to avoid multiple calls
560
+ ThreadPool::ThreadPool (Data&& d)
561
+ :
562
+ #ifdef ENABLE_THREADING
563
+ _data (new Data (std::move (d)))
564
+ #else
565
+ _data (nullptr )
566
+ #endif
567
+ {
568
+ }
569
+
488
570
ThreadPool::~ThreadPool ()
489
571
{
490
572
#ifdef ENABLE_THREADING
@@ -580,9 +662,19 @@ ThreadPool::globalThreadPool ()
580
662
//
581
663
// The global thread pool
582
664
//
583
-
665
+ #ifdef ILMTHREAD_USE_TBB
666
+ // Use TBB for the global thread pool by default
667
+ //
668
+ // We do not (currently) use this as the default thread pool
669
+ // provider as it can easily cause recursive mutex deadlocks as
670
+ // TBB shares a single thread pool with multiple arenas
671
+ static ThreadPool gThreadPool (
672
+ ThreadPool::Data (
673
+ new TBBThreadPoolProvider (
674
+ tbb::this_task_arena::max_concurrency ())));
675
+ #else
584
676
static ThreadPool gThreadPool (0 );
585
-
677
+ # endif
586
678
return gThreadPool ;
587
679
}
588
680
@@ -596,24 +688,28 @@ unsigned
596
688
ThreadPool::estimateThreadCountForFileIO ()
597
689
{
598
690
#ifdef ENABLE_THREADING
691
+ # if ILMTHREAD_USE_TBB
692
+ return tbb::this_task_arena::max_concurrency ();
693
+ # else
599
694
unsigned rv = std::thread::hardware_concurrency ();
600
695
// hardware concurrency is not required to work
601
696
if (rv == 0 ||
602
697
rv > static_cast <unsigned > (std::numeric_limits<int >::max ()))
603
698
{
604
699
rv = 1 ;
605
- # if (defined(_WIN32) || defined(_WIN64))
700
+ # if (defined(_WIN32) || defined(_WIN64))
606
701
SYSTEM_INFO si;
607
702
GetNativeSystemInfo (&si);
608
703
609
704
rv = si.dwNumberOfProcessors ;
610
- # else
705
+ # else
611
706
// linux, bsd, and mac are fine with this
612
707
// other *nix should be too, right?
613
708
rv = sysconf (_SC_NPROCESSORS_ONLN);
614
- # endif
709
+ # endif
615
710
}
616
711
return rv;
712
+ # endif
617
713
#else
618
714
return 0 ;
619
715
#endif
0 commit comments