2 /*********************************************************
4 * Copyright (C) 2014 by Vitaliy Vitsentiy
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 *********************************************************/
21 #ifndef __ctpl_thread_pool_H__
22 #define __ctpl_thread_pool_H__
32 #include <boost/lockfree/queue.hpp>
35 #ifndef _ctplThreadPoolLength_
36 #define _ctplThreadPoolLength_ 100
40 // thread pool to run user's functors with signature
41 // ret func(int id, other_params)
42 // where id is the index of the thread that runs the functor
43 // ret is some return type
52 thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
53 thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
55 // the destructor waits for all the functions in the queue to be finished
60 // get the number of running threads in the pool
61 int size() { return static_cast<int>(this->threads.size()); }
63 // number of idle threads
64 int n_idle() { return this->nWaiting; }
65 std::thread & get_thread(int i) { return *this->threads[i]; }
67 // change the number of threads in the pool
68 // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
69 // nThreads must be >= 0
70 void resize(int nThreads) {
71 if (!this->isStop && !this->isDone) {
72 int oldNThreads = static_cast<int>(this->threads.size());
73 if (oldNThreads <= nThreads) { // if the number of threads is increased
74 this->threads.resize(nThreads);
75 this->flags.resize(nThreads);
77 for (int i = oldNThreads; i < nThreads; ++i) {
78 this->flags[i] = std::make_shared<std::atomic<bool>>(false);
82 else { // the number of threads is decreased
83 for (int i = oldNThreads - 1; i >= nThreads; --i) {
84 *this->flags[i] = true; // this thread will finish
85 this->threads[i]->detach();
88 // stop the detached threads that were waiting
89 std::unique_lock<std::mutex> lock(this->mutex);
90 this->cv.notify_all();
92 this->threads.resize(nThreads); // safe to delete because the threads are detached
93 this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
100 std::function<void(int id)> * _f;
101 while (this->q.pop(_f))
102 delete _f; // empty the queue
105 // pops a functional wraper to the original function
106 std::function<void(int)> pop() {
107 std::function<void(int id)> * _f = nullptr;
109 std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
111 std::function<void(int)> f;
118 // wait for all computing threads to finish and stop all threads
119 // may be called asyncronously to not pause the calling thread while waiting
120 // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
121 void stop(bool isWait = false) {
126 for (int i = 0, n = this->size(); i < n; ++i) {
127 *this->flags[i] = true; // command the threads to stop
129 this->clear_queue(); // empty the queue
132 if (this->isDone || this->isStop)
134 this->isDone = true; // give the waiting threads a command to finish
137 std::unique_lock<std::mutex> lock(this->mutex);
138 this->cv.notify_all(); // stop all waiting threads
140 for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
141 if (this->threads[i]->joinable())
142 this->threads[i]->join();
144 // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
145 // therefore delete them here
147 this->threads.clear();
151 template<typename F, typename... Rest>
152 auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
153 auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
154 std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
157 auto _f = new std::function<void(int id)>([pck](int id) {
162 std::unique_lock<std::mutex> lock(this->mutex);
163 this->cv.notify_one();
165 return pck->get_future();
168 // run the user's function that excepts argument int - id of the running thread. returned value is templatized
169 // operator returns std::future, where the user can get the result and rethrow the catched exceptins
171 auto push(F && f) ->std::future<decltype(f(0))> {
172 auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
174 auto _f = new std::function<void(int id)>([pck](int id) {
179 std::unique_lock<std::mutex> lock(this->mutex);
180 this->cv.notify_one();
182 return pck->get_future();
189 thread_pool(const thread_pool &);// = delete;
190 thread_pool(thread_pool &&);// = delete;
191 thread_pool & operator=(const thread_pool &);// = delete;
192 thread_pool & operator=(thread_pool &&);// = delete;
194 void set_thread(int i) {
195 std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
196 auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
197 std::atomic<bool> & _flag = *flag;
198 std::function<void(int id)> * _f;
199 bool isPop = this->q.pop(_f);
201 while (isPop) { // if there is anything in the queue
202 std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
206 return; // the thread is wanted to stop, return even if the queue is not empty yet
208 isPop = this->q.pop(_f);
211 // the queue is empty here, wait for the next command
212 std::unique_lock<std::mutex> lock(this->mutex);
214 this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
218 return; // if the queue is empty and this->isDone == true or *flag then return
221 this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
224 void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
226 std::vector<std::unique_ptr<std::thread>> threads;
227 std::vector<std::shared_ptr<std::atomic<bool>>> flags;
228 mutable boost::lockfree::queue<std::function<void(int id)> *> q;
229 std::atomic<bool> isDone;
230 std::atomic<bool> isStop;
231 std::atomic<int> nWaiting; // how many threads are waiting
234 std::condition_variable cv;
239 #endif // __ctpl_thread_pool_H__