Salome HOME
Deactivating parallelism for 2D/1D + corrections for non parallel run + adding Parall...
[modules/smesh.git] / src / SMESH / ctpl.h
1
2 /*********************************************************
3  *
4  *  Copyright (C) 2014 by Vitaliy Vitsentiy
5  *
6  *  Licensed under the Apache License, Version 2.0 (the "License");
7  *  you may not use this file except in compliance with the License.
8  *  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *
18  *********************************************************/
19
20
21 #ifndef __ctpl_thread_pool_H__
22 #define __ctpl_thread_pool_H__
23
24 #include <functional>
25 #include <thread>
26 #include <atomic>
27 #include <vector>
28 #include <memory>
29 #include <exception>
30 #include <future>
31 #include <mutex>
32 #include <boost/lockfree/queue.hpp>
33
34
35 #ifndef _ctplThreadPoolLength_
36 #define _ctplThreadPoolLength_  100
37 #endif
38
39
40 // thread pool to run user's functors with signature
41 //      ret func(int id, other_params)
42 // where id is the index of the thread that runs the functor
43 // ret is some return type
44
45
46 namespace ctpl {
47
48     class thread_pool {
49
50     public:
51
52         thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
53         thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
54
55         // the destructor waits for all the functions in the queue to be finished
56         ~thread_pool() {
57             this->stop(true);
58         }
59
60         // get the number of running threads in the pool
61         int size() { return static_cast<int>(this->threads.size()); }
62
63         // number of idle threads
64         int n_idle() { return this->nWaiting; }
65         std::thread & get_thread(int i) { return *this->threads[i]; }
66
67         // change the number of threads in the pool
68         // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
69         // nThreads must be >= 0
70         void resize(int nThreads) {
71             if (!this->isStop && !this->isDone) {
72                 int oldNThreads = static_cast<int>(this->threads.size());
73                 if (oldNThreads <= nThreads) {  // if the number of threads is increased
74                     this->threads.resize(nThreads);
75                     this->flags.resize(nThreads);
76
77                     for (int i = oldNThreads; i < nThreads; ++i) {
78                         this->flags[i] = std::make_shared<std::atomic<bool>>(false);
79                         this->set_thread(i);
80                     }
81                 }
82                 else {  // the number of threads is decreased
83                     for (int i = oldNThreads - 1; i >= nThreads; --i) {
84                         *this->flags[i] = true;  // this thread will finish
85                         this->threads[i]->detach();
86                     }
87                     {
88                         // stop the detached threads that were waiting
89                         std::unique_lock<std::mutex> lock(this->mutex);
90                         this->cv.notify_all();
91                     }
92                     this->threads.resize(nThreads);  // safe to delete because the threads are detached
93                     this->flags.resize(nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
94                 }
95             }
96         }
97
98         // empty the queue
99         void clear_queue() {
100             std::function<void(int id)> * _f;
101             while (this->q.pop(_f))
102                 delete _f;  // empty the queue
103         }
104
105         // pops a functional wraper to the original function
106         std::function<void(int)> pop() {
107             std::function<void(int id)> * _f = nullptr;
108             this->q.pop(_f);
109             std::unique_ptr<std::function<void(int id)>> func(_f);  // at return, delete the function even if an exception occurred
110             
111             std::function<void(int)> f;
112             if (_f)
113                 f = *_f;
114             return f;
115         }
116
117
118         // wait for all computing threads to finish and stop all threads
119         // may be called asyncronously to not pause the calling thread while waiting
120         // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
121         void stop(bool isWait = false) {
122             if (!isWait) {
123                 if (this->isStop)
124                     return;
125                 this->isStop = true;
126                 for (int i = 0, n = this->size(); i < n; ++i) {
127                     *this->flags[i] = true;  // command the threads to stop
128                 }
129                 this->clear_queue();  // empty the queue
130             }
131             else {
132                 if (this->isDone || this->isStop)
133                     return;
134                 this->isDone = true;  // give the waiting threads a command to finish
135             }
136             {
137                 std::unique_lock<std::mutex> lock(this->mutex);
138                 this->cv.notify_all();  // stop all waiting threads
139             }
140             for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) {  // wait for the computing threads to finish
141                 if (this->threads[i]->joinable())
142                     this->threads[i]->join();
143             }
144             // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
145             // therefore delete them here
146             this->clear_queue();
147             this->threads.clear();
148             this->flags.clear();
149         }
150
151         template<typename F, typename... Rest>
152         auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
153             auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
154                 std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
155             );
156
157             auto _f = new std::function<void(int id)>([pck](int id) {
158                 (*pck)(id);
159             });
160             this->q.push(_f);
161
162             std::unique_lock<std::mutex> lock(this->mutex);
163             this->cv.notify_one();
164
165             return pck->get_future();
166         }
167
168         // run the user's function that excepts argument int - id of the running thread. returned value is templatized
169         // operator returns std::future, where the user can get the result and rethrow the catched exceptins
170         template<typename F>
171         auto push(F && f) ->std::future<decltype(f(0))> {
172             auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
173
174             auto _f = new std::function<void(int id)>([pck](int id) {
175                 (*pck)(id);
176             });
177             this->q.push(_f);
178
179             std::unique_lock<std::mutex> lock(this->mutex);
180             this->cv.notify_one();
181
182             return pck->get_future();
183         }
184
185
186     private:
187
188         // deleted
189         thread_pool(const thread_pool &);// = delete;
190         thread_pool(thread_pool &&);// = delete;
191         thread_pool & operator=(const thread_pool &);// = delete;
192         thread_pool & operator=(thread_pool &&);// = delete;
193
194         void set_thread(int i) {
195             std::shared_ptr<std::atomic<bool>> flag(this->flags[i]);  // a copy of the shared ptr to the flag
196             auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
197                 std::atomic<bool> & _flag = *flag;
198                 std::function<void(int id)> * _f;
199                 bool isPop = this->q.pop(_f);
200                 while (true) {
201                     while (isPop) {  // if there is anything in the queue
202                         std::unique_ptr<std::function<void(int id)>> func(_f);  // at return, delete the function even if an exception occurred
203                         (*_f)(i);
204
205                         if (_flag)
206                             return;  // the thread is wanted to stop, return even if the queue is not empty yet
207                         else
208                             isPop = this->q.pop(_f);
209                     }
210
211                     // the queue is empty here, wait for the next command
212                     std::unique_lock<std::mutex> lock(this->mutex);
213                     ++this->nWaiting;
214                     this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
215                     --this->nWaiting;
216
217                     if (!isPop)
218                         return;  // if the queue is empty and this->isDone == true or *flag then return
219                 }
220             };
221             this->threads[i].reset(new std::thread(f));  // compiler may not support std::make_unique()
222         }
223
224         void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
225
226         std::vector<std::unique_ptr<std::thread>> threads;
227         std::vector<std::shared_ptr<std::atomic<bool>>> flags;
228         mutable boost::lockfree::queue<std::function<void(int id)> *> q;
229         std::atomic<bool> isDone;
230         std::atomic<bool> isStop;
231         std::atomic<int> nWaiting;  // how many threads are waiting
232
233         std::mutex mutex;
234         std::condition_variable cv;
235     };
236
237 }
238
239 #endif // __ctpl_thread_pool_H__
240