Salome HOME
Allow to control max number of threads to be launched simultaneouly in case of big...
authorAnthony Geay <anthony.geay@edf.fr>
Mon, 26 Jun 2023 12:24:36 +0000 (14:24 +0200)
committerAnthony Geay <anthony.geay@edf.fr>
Tue, 22 Aug 2023 10:54:53 +0000 (12:54 +0200)
src/engine/DynParaLoop.cxx
src/engine/Executor.cxx
src/engine/Executor.hxx

index 3ea0a64ec2ce1172a5062bc940d51381a5558041..46efdac6dda777b2daaec0dfdc1fcce879d49cf6 100644 (file)
@@ -425,6 +425,7 @@ DynParaLoop::TypeOfNode DynParaLoop::getIdentityOfNotifyerNode(const Node *node,
   for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++,id++)
     if (*iter==node)
       return FINALIZE_NODE;
+  throw Exception("DynParaLoop::getIdentityOfNotifyerNode: unrecognized node !");
 }
 
 void DynParaLoop::setWeight(double loopWeight)
index 6b43fb0ee9eb9e3cbe7ee451198e6e9485229c76..1dbd5957c67d5e2a0d4100c4c4024ba4c4e4816d 100644 (file)
@@ -98,8 +98,6 @@ Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_n
 
 Executor::~Executor()
 {
-  for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
-    delete *iter;
 }
 
 //! Execute a graph waiting for completion
@@ -295,9 +293,10 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
       if (debug > 2) _displayDot(graph);
       { // --- Critical section
         YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
-        _tasks=graph->getNextTasks(isMore);
-        graph->selectRunnableTasks(_tasks);
-        FilterTasksConsideringContainers(_tasks);
+        std::vector<Task *> tasks = graph->getNextTasks(isMore);
+        graph->selectRunnableTasks(tasks);
+        filterTasksConsideringContainers(tasks);
+        _tasks = tasks;
         numberAllTasks=_numberOfRunningTasks+_tasks.size();
       } // --- End of critical section
       if (debug > 2) _displayDot(graph);
@@ -1094,16 +1093,6 @@ void Executor::sleepWhileNoEventsFromAnyRunningTask()
   DEBTRACE("---");
 }
 
-//! not implemented
-
-void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
-{
-  /*_mutexForNbOfConcurrentThreads.lock();
-  _groupOfAllThreadsCreated.remove(thread);
-  delete thread;
-  _mutexForNbOfConcurrentThreads.unlock();*/
-}
-
 
 //! must be used protected by _mutexForSchedulerUpdate!
 
@@ -1119,17 +1108,32 @@ void Executor::wakeUp()
     _numberOfEndedTasks++;
 }
 
-//! number of running tasks
+int Executor::getMaxNbOfThreads() const
+{
+  return (int)_maxNbThreads;
+}
+
+void Executor::setMaxNbOfThreads(int maxNbThreads)
+{
+  _maxNbThreads = static_cast< std::uint32_t >(maxNbThreads);
+}
 
 int Executor::getNbOfThreads()
 {
-  int ret;
+  int ret = 0;
   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
   _isRunningunderExternalControl=true;
-  ret = _groupOfAllThreadsCreated.size();
   return ret;
 }
 
+//! number of running tasks
+int Executor::getNumberOfRunningTasks()
+{
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+  _isRunningunderExternalControl=true;
+  return _numberOfRunningTasks;
+}
+
 /*!
  * This thread is NOT supposed to be detached !
  */
@@ -1290,7 +1294,6 @@ void *Executor::functionForTaskExecution(void *arg)
 
   } // --- End of critical section (change state)
 
-  //execInst->notifyEndOfThread(0);
   Thread::exit(0);
   return 0;
 }
@@ -1346,7 +1349,7 @@ struct HPCCompare
  *
  * \param [in,out] tsks - list of tasks to be
  */
-void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
+void Executor::filterTasksConsideringContainers(std::vector<Task *>& tsks)
 {
   std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
   for(auto cur : tsks)
@@ -1375,7 +1378,16 @@ void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
       const std::vector<Task *>& curtsks(it.second);
       if(!curhpc)
         {
-          ret.insert(ret.end(),curtsks.begin(),curtsks.end());
+          std::uint32_t nbThreadsRunning = _tasks.size();
+          std::uint32_t nbOfFreeSpace = _maxNbThreads - min(_maxNbThreads,nbThreadsRunning);
+          std::uint32_t nbOfCandidates = static_cast<std::uint32_t>( curtsks.size() );
+          std::uint32_t nbOfCandidatesToBeLaunched = std::min(nbOfCandidates,nbOfFreeSpace);
+          DEBTRACE("nb threads running: " << nbThreadsRunning);
+          DEBTRACE("MaxNbThreads: " << _maxNbThreads);
+          DEBTRACE("nbOfFreeSpace: " << nbOfFreeSpace);
+          DEBTRACE("nbOfCandidates: " << nbOfCandidates);
+          DEBTRACE("nbOfCandidatesToBeLaunched: " << nbOfCandidatesToBeLaunched);
+          ret.insert(ret.end(),curtsks.begin(),curtsks.begin() + nbOfCandidatesToBeLaunched);
         }
       else
         {
@@ -1745,7 +1757,7 @@ void Executor::runWlm(Scheduler *graph,int debug, bool fromScratch)
           if(_runningTasks.find(t) == _runningTasks.end())
             _tasks.push_back(t);
         // TODO: to be removed
-        FilterTasksConsideringContainers(_tasks);
+        filterTasksConsideringContainers(_tasks);
         numberAllTasks=_numberOfRunningTasks+_tasks.size();
       } // --- End of critical section
       if (debug > 2) _displayDot(graph);
index d861139a35b6a37b7d71a38775c314c0b39d9d7f..462a7500318ec2b0e9a12e8e48d3461fcf82ff56 100644 (file)
@@ -38,6 +38,7 @@
 #include <fstream>
 #include <ctime>
 #include <chrono>
+#include <cstdint>
 
 namespace WorkloadManager
 {
@@ -71,6 +72,7 @@ namespace YACS
       YACS::BASES::Condition _condForPilot;
       YACS::BASES::Mutex _mutexForSchedulerUpdate;
       YACS::BASES::Mutex _mutexForTrace;
+      std::uint32_t _maxNbThreads = 10000;
       bool _toContinue;
       bool _isOKToEnd;
       bool _stopOnErrorRequested;
@@ -88,7 +90,6 @@ namespace YACS
       std::list<std::string> _listOfTasksToLoad;
       std::vector<Task *> _tasks; 
       std::vector<Task *> _tasksSave; 
-      std::list< YACS::BASES::Thread * > _groupOfAllThreadsCreated;
       std::ofstream _trace;
       std::string _dumpErrorFile;
       bool _keepGoingOnFail;
@@ -117,8 +118,10 @@ namespace YACS
       void stopExecution();
       bool saveState(const std::string& xmlFile);
       bool loadState();
+      int getMaxNbOfThreads() const;
+      void setMaxNbOfThreads(int maxNbThreads);
       int getNbOfThreads();
-      int getNumberOfRunningTasks() const { return _numberOfRunningTasks; }
+      int getNumberOfRunningTasks();
       void displayDot(Scheduler *graph);
       void setStopOnError(bool dumpRequested=false, std::string xmlFile="");
       void unsetStopOnError();
@@ -145,11 +148,10 @@ namespace YACS
       void launchTask(Task *task);
       void wakeUp();
       void sleepWhileNoEventsFromAnyRunningTask();
-      void notifyEndOfThread(YACS::BASES::Thread *thread);
       void traceExec(Task *task, const std::string& message, const std::string& placement);
       void _displayDot(Scheduler *graph);
       virtual void sendEvent(const std::string& event);
-      static void FilterTasksConsideringContainers(std::vector<Task *>& tsks);
+      void filterTasksConsideringContainers(std::vector<Task *>& tsks);
       static std::string ComputePlacement(Task *zeTask);
     protected:
       static void *functionForTaskLoad(void *);