]> SALOME platform Git repositories - modules/smesh.git/commitdiff
Salome HOME
Replacing ctpl by boost::thread_pool
authorYoann Audouin <yoann.audouin@edf.fr>
Mon, 12 Sep 2022 14:17:20 +0000 (16:17 +0200)
committerYoann Audouin <yoann.audouin@edf.fr>
Mon, 12 Sep 2022 14:17:20 +0000 (16:17 +0200)
src/SMESH/CMakeLists.txt
src/SMESH/SMESH_Gen.cxx
src/SMESH/SMESH_Gen.hxx
src/SMESH/SMESH_Mesh.hxx
src/SMESH/ctpl.h [deleted file]

index b8a9fdf7997c28dbb57004093995f02d2bd7fa8f..ab288a011d22da8af380aa85f21eaaa9f8dd192c 100644 (file)
@@ -90,7 +90,6 @@ SET(SMESHimpl_HEADERS
   SMESH_SMESH.hxx
   MG_ADAPT.hxx
   SMESH_Homard.hxx
-  ctpl.h
   DriverMesh.hxx
   DriverStep.hxx
 )
index eb292e928b949466984adbe96c49bc45ec050251..b7433ae7045945d70c7f58c496e16c9b65f5a41f 100644 (file)
@@ -49,6 +49,7 @@
 
 #include "memoire.h"
 #include <chrono>
+#include <functional>
 
 #ifdef WIN32
   #include <windows.h>
@@ -58,6 +59,7 @@
 
 using namespace std;
 #include <boost/filesystem.hpp>
+#include <boost/asio.hpp>
 namespace fs = boost::filesystem;
 
 // Environment variable separator
@@ -247,7 +249,6 @@ bool SMESH_Gen::Compute(SMESH_Mesh &                aMesh,
     // ===============================================
 
     TopAbs_ShapeEnum previousShapeType = TopAbs_VERTEX;
-    std::vector<std::future<void>> pending;
     int nbThreads = aMesh.GetNbThreads();
     auto begin = std::chrono::high_resolution_clock::now();
 
@@ -269,9 +270,7 @@ bool SMESH_Gen::Compute(SMESH_Mesh &                aMesh,
       //DEBUG std::cout << "Shape Type" << shapeType << " previous" << previousShapeType << std::endl;
       if ((aMesh.IsParallel()||nbThreads!=0) && shapeType != previousShapeType) {
         // Waiting for all threads for the previous type to end
-        for(auto &it: pending){
-          it.wait();
-        }
+        aMesh.wait();
 
         std::string file_name;
         switch(previousShapeType){
@@ -298,7 +297,6 @@ bool SMESH_Gen::Compute(SMESH_Mesh &                aMesh,
         }
         //Resetting threaded pool info
         previousShapeType = shapeType;
-        pending.clear();
       }
 
       // check for preview dimension limitations
@@ -311,9 +309,11 @@ bool SMESH_Gen::Compute(SMESH_Mesh &                aMesh,
       }
       if(aMesh.IsParallel())
       {
-        pending.push_back(aMesh._pool->push(compute_function, smToCompute, computeEvent,
-                             shapeSM, aShapeOnly, allowedSubShapes,
-                             aShapesId));
+        std::cout << "Submitting thread function " << std::endl;
+        boost::asio::post(*(aMesh._pool), [](){std::cerr<< "In Here" << std::endl;});
+        boost::asio::post(*(aMesh._pool), std::bind(compute_function, 1, smToCompute, computeEvent,
+                          shapeSM, aShapeOnly, allowedSubShapes,
+                          aShapesId));
       } else {
         auto begin2 = std::chrono::high_resolution_clock::now();
 
@@ -334,10 +334,7 @@ bool SMESH_Gen::Compute(SMESH_Mesh &                aMesh,
     // TODO: Check error handling in parallel mode
     if(aMesh.IsParallel()){
       // Waiting for the thread for Solids to finish
-      for(auto &it:pending){
-        it.wait();
-      }
-      pending.clear();
+      aMesh.wait();
     }
 
     aMesh.GetMeshDS()->Modified();
index 02f70250d9a59112fb4e23409b0918e72e4fabf5..6bb0fe1c40544a0c4e5c0e649f0bec1e29b3eca2 100644 (file)
@@ -34,8 +34,6 @@
 
 #include "SMESH_Algo.hxx"
 #include "SMESH_ComputeError.hxx"
-#include "ctpl.h"
-
 
 #include <map>
 #include <list>
index f6847260b3672e806c36999891d2c7dfaddb1dba..533b5648f02c5308522d75beba49da6df698c999 100644 (file)
 
 #include "MEDCouplingMemArray.hxx"
 
-#include "ctpl.h"
-
 #include <map>
 #include <list>
 #include <vector>
 #include <ostream>
 #include <boost/filesystem.hpp>
+#include <boost/asio/thread_pool.hpp>
 
 #ifdef WIN32
 #pragma warning(disable:4251) // Warning DLL Interface ...
@@ -385,7 +384,7 @@ class SMESH_EXPORT SMESH_Mesh
 
   std::ostream& Dump(std::ostream & save);
 
-  // Data for parallel computation
+  // Parallel computation functions
 
   void Lock() {_my_lock.lock();};
   void Unlock() {_my_lock.unlock();};
@@ -393,15 +392,16 @@ class SMESH_EXPORT SMESH_Mesh
   int GetNbThreads(){return _NbThreads;};
   void SetNbThreads(int nbThreads){_NbThreads=nbThreads;};
 
-  void InitPoolThreads(){_pool = new ctpl::thread_pool(_NbThreads);};
+  void InitPoolThreads(){_pool = new boost::asio::thread_pool(_NbThreads);};
   void DeletePoolThreads(){delete _pool;};
 
+  void wait(){_pool->join(); DeletePoolThreads(); InitPoolThreads(); }
+
   bool IsParallel(){return _NbThreads > 0;}
 
   // Temporary folder used during parallel Computation
   boost::filesystem::path tmp_folder;
-  // TODO: Replace by number of thread
-  ctpl::thread_pool *     _pool = nullptr; //thread pool for computation
+  boost::asio::thread_pool *     _pool = nullptr; //thread pool for computation
 
 
 private:
diff --git a/src/SMESH/ctpl.h b/src/SMESH/ctpl.h
deleted file mode 100644 (file)
index 64f650d..0000000
+++ /dev/null
@@ -1,240 +0,0 @@
-
-/*********************************************************
- *
- *  Copyright (C) 2014 by Vitaliy Vitsentiy
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *********************************************************/
-
-
-#ifndef __ctpl_thread_pool_H__
-#define __ctpl_thread_pool_H__
-
-#include <functional>
-#include <thread>
-#include <atomic>
-#include <vector>
-#include <memory>
-#include <exception>
-#include <future>
-#include <mutex>
-#include <boost/lockfree/queue.hpp>
-
-
-#ifndef _ctplThreadPoolLength_
-#define _ctplThreadPoolLength_  100
-#endif
-
-
-// thread pool to run user's functors with signature
-//      ret func(int id, other_params)
-// where id is the index of the thread that runs the functor
-// ret is some return type
-
-
-namespace ctpl {
-
-    class thread_pool {
-
-    public:
-
-        thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
-        thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
-
-        // the destructor waits for all the functions in the queue to be finished
-        ~thread_pool() {
-            this->stop(true);
-        }
-
-        // get the number of running threads in the pool
-        int size() { return static_cast<int>(this->threads.size()); }
-
-        // number of idle threads
-        int n_idle() { return this->nWaiting; }
-        std::thread & get_thread(int i) { return *this->threads[i]; }
-
-        // change the number of threads in the pool
-        // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
-        // nThreads must be >= 0
-        void resize(int nThreads) {
-            if (!this->isStop && !this->isDone) {
-                int oldNThreads = static_cast<int>(this->threads.size());
-                if (oldNThreads <= nThreads) {  // if the number of threads is increased
-                    this->threads.resize(nThreads);
-                    this->flags.resize(nThreads);
-
-                    for (int i = oldNThreads; i < nThreads; ++i) {
-                        this->flags[i] = std::make_shared<std::atomic<bool>>(false);
-                        this->set_thread(i);
-                    }
-                }
-                else {  // the number of threads is decreased
-                    for (int i = oldNThreads - 1; i >= nThreads; --i) {
-                        *this->flags[i] = true;  // this thread will finish
-                        this->threads[i]->detach();
-                    }
-                    {
-                        // stop the detached threads that were waiting
-                        std::unique_lock<std::mutex> lock(this->mutex);
-                        this->cv.notify_all();
-                    }
-                    this->threads.resize(nThreads);  // safe to delete because the threads are detached
-                    this->flags.resize(nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
-                }
-            }
-        }
-
-        // empty the queue
-        void clear_queue() {
-            std::function<void(int id)> * _f;
-            while (this->q.pop(_f))
-                delete _f;  // empty the queue
-        }
-
-        // pops a functional wraper to the original function
-        std::function<void(int)> pop() {
-            std::function<void(int id)> * _f = nullptr;
-            this->q.pop(_f);
-            std::unique_ptr<std::function<void(int id)>> func(_f);  // at return, delete the function even if an exception occurred
-            
-            std::function<void(int)> f;
-            if (_f)
-                f = *_f;
-            return f;
-        }
-
-
-        // wait for all computing threads to finish and stop all threads
-        // may be called asyncronously to not pause the calling thread while waiting
-        // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
-        void stop(bool isWait = false) {
-            if (!isWait) {
-                if (this->isStop)
-                    return;
-                this->isStop = true;
-                for (int i = 0, n = this->size(); i < n; ++i) {
-                    *this->flags[i] = true;  // command the threads to stop
-                }
-                this->clear_queue();  // empty the queue
-            }
-            else {
-                if (this->isDone || this->isStop)
-                    return;
-                this->isDone = true;  // give the waiting threads a command to finish
-            }
-            {
-                std::unique_lock<std::mutex> lock(this->mutex);
-                this->cv.notify_all();  // stop all waiting threads
-            }
-            for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) {  // wait for the computing threads to finish
-                if (this->threads[i]->joinable())
-                    this->threads[i]->join();
-            }
-            // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
-            // therefore delete them here
-            this->clear_queue();
-            this->threads.clear();
-            this->flags.clear();
-        }
-
-        template<typename F, typename... Rest>
-        auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
-            auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
-                std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
-            );
-
-            auto _f = new std::function<void(int id)>([pck](int id) {
-                (*pck)(id);
-            });
-            this->q.push(_f);
-
-            std::unique_lock<std::mutex> lock(this->mutex);
-            this->cv.notify_one();
-
-            return pck->get_future();
-        }
-
-        // run the user's function that excepts argument int - id of the running thread. returned value is templatized
-        // operator returns std::future, where the user can get the result and rethrow the catched exceptins
-        template<typename F>
-        auto push(F && f) ->std::future<decltype(f(0))> {
-            auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
-
-            auto _f = new std::function<void(int id)>([pck](int id) {
-                (*pck)(id);
-            });
-            this->q.push(_f);
-
-            std::unique_lock<std::mutex> lock(this->mutex);
-            this->cv.notify_one();
-
-            return pck->get_future();
-        }
-
-
-    private:
-
-        // deleted
-        thread_pool(const thread_pool &);// = delete;
-        thread_pool(thread_pool &&);// = delete;
-        thread_pool & operator=(const thread_pool &);// = delete;
-        thread_pool & operator=(thread_pool &&);// = delete;
-
-        void set_thread(int i) {
-            std::shared_ptr<std::atomic<bool>> flag(this->flags[i]);  // a copy of the shared ptr to the flag
-            auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
-                std::atomic<bool> & _flag = *flag;
-                std::function<void(int id)> * _f;
-                bool isPop = this->q.pop(_f);
-                while (true) {
-                    while (isPop) {  // if there is anything in the queue
-                        std::unique_ptr<std::function<void(int id)>> func(_f);  // at return, delete the function even if an exception occurred
-                        (*_f)(i);
-
-                        if (_flag)
-                            return;  // the thread is wanted to stop, return even if the queue is not empty yet
-                        else
-                            isPop = this->q.pop(_f);
-                    }
-
-                    // the queue is empty here, wait for the next command
-                    std::unique_lock<std::mutex> lock(this->mutex);
-                    ++this->nWaiting;
-                    this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
-                    --this->nWaiting;
-
-                    if (!isPop)
-                        return;  // if the queue is empty and this->isDone == true or *flag then return
-                }
-            };
-            this->threads[i].reset(new std::thread(f));  // compiler may not support std::make_unique()
-        }
-
-        void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
-
-        std::vector<std::unique_ptr<std::thread>> threads;
-        std::vector<std::shared_ptr<std::atomic<bool>>> flags;
-        mutable boost::lockfree::queue<std::function<void(int id)> *> q;
-        std::atomic<bool> isDone;
-        std::atomic<bool> isStop;
-        std::atomic<int> nWaiting;  // how many threads are waiting
-
-        std::mutex mutex;
-        std::condition_variable cv;
-    };
-
-}
-
-#endif // __ctpl_thread_pool_H__
-