From: Yoann Audouin Date: Mon, 12 Sep 2022 14:17:20 +0000 (+0200) Subject: Replacing ctpl by boost::thread_pool X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=c3580f2d903f201bbe4989964b31e64eee6966d8;p=modules%2Fsmesh.git Replacing ctpl by boost::thread_pool --- diff --git a/src/SMESH/CMakeLists.txt b/src/SMESH/CMakeLists.txt index b8a9fdf79..ab288a011 100644 --- a/src/SMESH/CMakeLists.txt +++ b/src/SMESH/CMakeLists.txt @@ -90,7 +90,6 @@ SET(SMESHimpl_HEADERS SMESH_SMESH.hxx MG_ADAPT.hxx SMESH_Homard.hxx - ctpl.h DriverMesh.hxx DriverStep.hxx ) diff --git a/src/SMESH/SMESH_Gen.cxx b/src/SMESH/SMESH_Gen.cxx index eb292e928..b7433ae70 100644 --- a/src/SMESH/SMESH_Gen.cxx +++ b/src/SMESH/SMESH_Gen.cxx @@ -49,6 +49,7 @@ #include "memoire.h" #include +#include #ifdef WIN32 #include @@ -58,6 +59,7 @@ using namespace std; #include +#include namespace fs = boost::filesystem; // Environment variable separator @@ -247,7 +249,6 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh, // =============================================== TopAbs_ShapeEnum previousShapeType = TopAbs_VERTEX; - std::vector> pending; int nbThreads = aMesh.GetNbThreads(); auto begin = std::chrono::high_resolution_clock::now(); @@ -269,9 +270,7 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh, //DEBUG std::cout << "Shape Type" << shapeType << " previous" << previousShapeType << std::endl; if ((aMesh.IsParallel()||nbThreads!=0) && shapeType != previousShapeType) { // Waiting for all threads for the previous type to end - for(auto &it: pending){ - it.wait(); - } + aMesh.wait(); std::string file_name; switch(previousShapeType){ @@ -298,7 +297,6 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh, } //Resetting threaded pool info previousShapeType = shapeType; - pending.clear(); } // check for preview dimension limitations @@ -311,9 +309,11 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh, } if(aMesh.IsParallel()) { - pending.push_back(aMesh._pool->push(compute_function, smToCompute, computeEvent, - shapeSM, aShapeOnly, allowedSubShapes, - aShapesId)); + std::cout << "Submitting thread function " << std::endl; + boost::asio::post(*(aMesh._pool), [](){std::cerr<< "In Here" << std::endl;}); + boost::asio::post(*(aMesh._pool), std::bind(compute_function, 1, smToCompute, computeEvent, + shapeSM, aShapeOnly, allowedSubShapes, + aShapesId)); } else { auto begin2 = std::chrono::high_resolution_clock::now(); @@ -334,10 +334,7 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh, // TODO: Check error handling in parallel mode if(aMesh.IsParallel()){ // Waiting for the thread for Solids to finish - for(auto &it:pending){ - it.wait(); - } - pending.clear(); + aMesh.wait(); } aMesh.GetMeshDS()->Modified(); diff --git a/src/SMESH/SMESH_Gen.hxx b/src/SMESH/SMESH_Gen.hxx index 02f70250d..6bb0fe1c4 100644 --- a/src/SMESH/SMESH_Gen.hxx +++ b/src/SMESH/SMESH_Gen.hxx @@ -34,8 +34,6 @@ #include "SMESH_Algo.hxx" #include "SMESH_ComputeError.hxx" -#include "ctpl.h" - #include #include diff --git a/src/SMESH/SMESH_Mesh.hxx b/src/SMESH/SMESH_Mesh.hxx index f6847260b..533b5648f 100644 --- a/src/SMESH/SMESH_Mesh.hxx +++ b/src/SMESH/SMESH_Mesh.hxx @@ -43,13 +43,12 @@ #include "MEDCouplingMemArray.hxx" -#include "ctpl.h" - #include #include #include #include #include +#include #ifdef WIN32 #pragma warning(disable:4251) // Warning DLL Interface ... @@ -385,7 +384,7 @@ class SMESH_EXPORT SMESH_Mesh std::ostream& Dump(std::ostream & save); - // Data for parallel computation + // Parallel computation functions void Lock() {_my_lock.lock();}; void Unlock() {_my_lock.unlock();}; @@ -393,15 +392,16 @@ class SMESH_EXPORT SMESH_Mesh int GetNbThreads(){return _NbThreads;}; void SetNbThreads(int nbThreads){_NbThreads=nbThreads;}; - void InitPoolThreads(){_pool = new ctpl::thread_pool(_NbThreads);}; + void InitPoolThreads(){_pool = new boost::asio::thread_pool(_NbThreads);}; void DeletePoolThreads(){delete _pool;}; + void wait(){_pool->join(); DeletePoolThreads(); InitPoolThreads(); } + bool IsParallel(){return _NbThreads > 0;} // Temporary folder used during parallel Computation boost::filesystem::path tmp_folder; - // TODO: Replace by number of thread - ctpl::thread_pool * _pool = nullptr; //thread pool for computation + boost::asio::thread_pool * _pool = nullptr; //thread pool for computation private: diff --git a/src/SMESH/ctpl.h b/src/SMESH/ctpl.h deleted file mode 100644 index 64f650d3e..000000000 --- a/src/SMESH/ctpl.h +++ /dev/null @@ -1,240 +0,0 @@ - -/********************************************************* - * - * Copyright (C) 2014 by Vitaliy Vitsentiy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - *********************************************************/ - - -#ifndef __ctpl_thread_pool_H__ -#define __ctpl_thread_pool_H__ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -#ifndef _ctplThreadPoolLength_ -#define _ctplThreadPoolLength_ 100 -#endif - - -// thread pool to run user's functors with signature -// ret func(int id, other_params) -// where id is the index of the thread that runs the functor -// ret is some return type - - -namespace ctpl { - - class thread_pool { - - public: - - thread_pool() : q(_ctplThreadPoolLength_) { this->init(); } - thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); } - - // the destructor waits for all the functions in the queue to be finished - ~thread_pool() { - this->stop(true); - } - - // get the number of running threads in the pool - int size() { return static_cast(this->threads.size()); } - - // number of idle threads - int n_idle() { return this->nWaiting; } - std::thread & get_thread(int i) { return *this->threads[i]; } - - // change the number of threads in the pool - // should be called from one thread, otherwise be careful to not interleave, also with this->stop() - // nThreads must be >= 0 - void resize(int nThreads) { - if (!this->isStop && !this->isDone) { - int oldNThreads = static_cast(this->threads.size()); - if (oldNThreads <= nThreads) { // if the number of threads is increased - this->threads.resize(nThreads); - this->flags.resize(nThreads); - - for (int i = oldNThreads; i < nThreads; ++i) { - this->flags[i] = std::make_shared>(false); - this->set_thread(i); - } - } - else { // the number of threads is decreased - for (int i = oldNThreads - 1; i >= nThreads; --i) { - *this->flags[i] = true; // this thread will finish - this->threads[i]->detach(); - } - { - // stop the detached threads that were waiting - std::unique_lock lock(this->mutex); - this->cv.notify_all(); - } - this->threads.resize(nThreads); // safe to delete because the threads are detached - this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals - } - } - } - - // empty the queue - void clear_queue() { - std::function * _f; - while (this->q.pop(_f)) - delete _f; // empty the queue - } - - // pops a functional wraper to the original function - std::function pop() { - std::function * _f = nullptr; - this->q.pop(_f); - std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred - - std::function f; - if (_f) - f = *_f; - return f; - } - - - // wait for all computing threads to finish and stop all threads - // may be called asyncronously to not pause the calling thread while waiting - // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions - void stop(bool isWait = false) { - if (!isWait) { - if (this->isStop) - return; - this->isStop = true; - for (int i = 0, n = this->size(); i < n; ++i) { - *this->flags[i] = true; // command the threads to stop - } - this->clear_queue(); // empty the queue - } - else { - if (this->isDone || this->isStop) - return; - this->isDone = true; // give the waiting threads a command to finish - } - { - std::unique_lock lock(this->mutex); - this->cv.notify_all(); // stop all waiting threads - } - for (int i = 0; i < static_cast(this->threads.size()); ++i) { // wait for the computing threads to finish - if (this->threads[i]->joinable()) - this->threads[i]->join(); - } - // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads - // therefore delete them here - this->clear_queue(); - this->threads.clear(); - this->flags.clear(); - } - - template - auto push(F && f, Rest&&... rest) ->std::future { - auto pck = std::make_shared>( - std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...) - ); - - auto _f = new std::function([pck](int id) { - (*pck)(id); - }); - this->q.push(_f); - - std::unique_lock lock(this->mutex); - this->cv.notify_one(); - - return pck->get_future(); - } - - // run the user's function that excepts argument int - id of the running thread. returned value is templatized - // operator returns std::future, where the user can get the result and rethrow the catched exceptins - template - auto push(F && f) ->std::future { - auto pck = std::make_shared>(std::forward(f)); - - auto _f = new std::function([pck](int id) { - (*pck)(id); - }); - this->q.push(_f); - - std::unique_lock lock(this->mutex); - this->cv.notify_one(); - - return pck->get_future(); - } - - - private: - - // deleted - thread_pool(const thread_pool &);// = delete; - thread_pool(thread_pool &&);// = delete; - thread_pool & operator=(const thread_pool &);// = delete; - thread_pool & operator=(thread_pool &&);// = delete; - - void set_thread(int i) { - std::shared_ptr> flag(this->flags[i]); // a copy of the shared ptr to the flag - auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { - std::atomic & _flag = *flag; - std::function * _f; - bool isPop = this->q.pop(_f); - while (true) { - while (isPop) { // if there is anything in the queue - std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred - (*_f)(i); - - if (_flag) - return; // the thread is wanted to stop, return even if the queue is not empty yet - else - isPop = this->q.pop(_f); - } - - // the queue is empty here, wait for the next command - std::unique_lock lock(this->mutex); - ++this->nWaiting; - this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); - --this->nWaiting; - - if (!isPop) - return; // if the queue is empty and this->isDone == true or *flag then return - } - }; - this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() - } - - void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } - - std::vector> threads; - std::vector>> flags; - mutable boost::lockfree::queue *> q; - std::atomic isDone; - std::atomic isStop; - std::atomic nWaiting; // how many threads are waiting - - std::mutex mutex; - std::condition_variable cv; - }; - -} - -#endif // __ctpl_thread_pool_H__ -