From: Anthony Geay Date: Mon, 26 Jun 2023 12:24:36 +0000 (+0200) Subject: Allow to control max number of threads to be launched simultaneouly in case of big... X-Git-Tag: emc2p_1.4.0-rc1~8 X-Git-Url: http://git.salome-platform.org/gitweb/?p=modules%2Fyacs.git;a=commitdiff_plain;h=eff01af6f981c2ddc0e208bd56713883e1d20b86 Allow to control max number of threads to be launched simultaneouly in case of big graph with millions of // tasks. --- diff --git a/src/engine/DynParaLoop.cxx b/src/engine/DynParaLoop.cxx index 3ea0a64ec..46efdac6d 100644 --- a/src/engine/DynParaLoop.cxx +++ b/src/engine/DynParaLoop.cxx @@ -425,6 +425,7 @@ DynParaLoop::TypeOfNode DynParaLoop::getIdentityOfNotifyerNode(const Node *node, for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++,id++) if (*iter==node) return FINALIZE_NODE; + throw Exception("DynParaLoop::getIdentityOfNotifyerNode: unrecognized node !"); } void DynParaLoop::setWeight(double loopWeight) diff --git a/src/engine/Executor.cxx b/src/engine/Executor.cxx index 6b43fb0ee..1dbd5957c 100644 --- a/src/engine/Executor.cxx +++ b/src/engine/Executor.cxx @@ -98,8 +98,6 @@ Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_n Executor::~Executor() { - for(list::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++) - delete *iter; } //! Execute a graph waiting for completion @@ -295,9 +293,10 @@ void Executor::RunB(Scheduler *graph,int debug, bool fromScratch) if (debug > 2) _displayDot(graph); { // --- Critical section YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); - _tasks=graph->getNextTasks(isMore); - graph->selectRunnableTasks(_tasks); - FilterTasksConsideringContainers(_tasks); + std::vector tasks = graph->getNextTasks(isMore); + graph->selectRunnableTasks(tasks); + filterTasksConsideringContainers(tasks); + _tasks = tasks; numberAllTasks=_numberOfRunningTasks+_tasks.size(); } // --- End of critical section if (debug > 2) _displayDot(graph); @@ -1094,16 +1093,6 @@ void Executor::sleepWhileNoEventsFromAnyRunningTask() DEBTRACE("---"); } -//! not implemented - -void Executor::notifyEndOfThread(YACS::BASES::Thread *thread) -{ - /*_mutexForNbOfConcurrentThreads.lock(); - _groupOfAllThreadsCreated.remove(thread); - delete thread; - _mutexForNbOfConcurrentThreads.unlock();*/ -} - //! must be used protected by _mutexForSchedulerUpdate! @@ -1119,17 +1108,32 @@ void Executor::wakeUp() _numberOfEndedTasks++; } -//! number of running tasks +int Executor::getMaxNbOfThreads() const +{ + return (int)_maxNbThreads; +} + +void Executor::setMaxNbOfThreads(int maxNbThreads) +{ + _maxNbThreads = static_cast< std::uint32_t >(maxNbThreads); +} int Executor::getNbOfThreads() { - int ret; + int ret = 0; YACS::BASES::AutoLocker alck(&_mutexForNbOfConcurrentThreads); _isRunningunderExternalControl=true; - ret = _groupOfAllThreadsCreated.size(); return ret; } +//! number of running tasks +int Executor::getNumberOfRunningTasks() +{ + YACS::BASES::AutoLocker alck(&_mutexForSchedulerUpdate); + _isRunningunderExternalControl=true; + return _numberOfRunningTasks; +} + /*! * This thread is NOT supposed to be detached ! */ @@ -1290,7 +1294,6 @@ void *Executor::functionForTaskExecution(void *arg) } // --- End of critical section (change state) - //execInst->notifyEndOfThread(0); Thread::exit(0); return 0; } @@ -1346,7 +1349,7 @@ struct HPCCompare * * \param [in,out] tsks - list of tasks to be */ -void Executor::FilterTasksConsideringContainers(std::vector& tsks) +void Executor::filterTasksConsideringContainers(std::vector& tsks) { std::map, HPCCompare > m; for(auto cur : tsks) @@ -1375,7 +1378,16 @@ void Executor::FilterTasksConsideringContainers(std::vector& tsks) const std::vector& curtsks(it.second); if(!curhpc) { - ret.insert(ret.end(),curtsks.begin(),curtsks.end()); + std::uint32_t nbThreadsRunning = _tasks.size(); + std::uint32_t nbOfFreeSpace = _maxNbThreads - min(_maxNbThreads,nbThreadsRunning); + std::uint32_t nbOfCandidates = static_cast( curtsks.size() ); + std::uint32_t nbOfCandidatesToBeLaunched = std::min(nbOfCandidates,nbOfFreeSpace); + DEBTRACE("nb threads running: " << nbThreadsRunning); + DEBTRACE("MaxNbThreads: " << _maxNbThreads); + DEBTRACE("nbOfFreeSpace: " << nbOfFreeSpace); + DEBTRACE("nbOfCandidates: " << nbOfCandidates); + DEBTRACE("nbOfCandidatesToBeLaunched: " << nbOfCandidatesToBeLaunched); + ret.insert(ret.end(),curtsks.begin(),curtsks.begin() + nbOfCandidatesToBeLaunched); } else { @@ -1745,7 +1757,7 @@ void Executor::runWlm(Scheduler *graph,int debug, bool fromScratch) if(_runningTasks.find(t) == _runningTasks.end()) _tasks.push_back(t); // TODO: to be removed - FilterTasksConsideringContainers(_tasks); + filterTasksConsideringContainers(_tasks); numberAllTasks=_numberOfRunningTasks+_tasks.size(); } // --- End of critical section if (debug > 2) _displayDot(graph); diff --git a/src/engine/Executor.hxx b/src/engine/Executor.hxx index d861139a3..462a75003 100644 --- a/src/engine/Executor.hxx +++ b/src/engine/Executor.hxx @@ -38,6 +38,7 @@ #include #include #include +#include namespace WorkloadManager { @@ -71,6 +72,7 @@ namespace YACS YACS::BASES::Condition _condForPilot; YACS::BASES::Mutex _mutexForSchedulerUpdate; YACS::BASES::Mutex _mutexForTrace; + std::uint32_t _maxNbThreads = 10000; bool _toContinue; bool _isOKToEnd; bool _stopOnErrorRequested; @@ -88,7 +90,6 @@ namespace YACS std::list _listOfTasksToLoad; std::vector _tasks; std::vector _tasksSave; - std::list< YACS::BASES::Thread * > _groupOfAllThreadsCreated; std::ofstream _trace; std::string _dumpErrorFile; bool _keepGoingOnFail; @@ -117,8 +118,10 @@ namespace YACS void stopExecution(); bool saveState(const std::string& xmlFile); bool loadState(); + int getMaxNbOfThreads() const; + void setMaxNbOfThreads(int maxNbThreads); int getNbOfThreads(); - int getNumberOfRunningTasks() const { return _numberOfRunningTasks; } + int getNumberOfRunningTasks(); void displayDot(Scheduler *graph); void setStopOnError(bool dumpRequested=false, std::string xmlFile=""); void unsetStopOnError(); @@ -145,11 +148,10 @@ namespace YACS void launchTask(Task *task); void wakeUp(); void sleepWhileNoEventsFromAnyRunningTask(); - void notifyEndOfThread(YACS::BASES::Thread *thread); void traceExec(Task *task, const std::string& message, const std::string& placement); void _displayDot(Scheduler *graph); virtual void sendEvent(const std::string& event); - static void FilterTasksConsideringContainers(std::vector& tsks); + void filterTasksConsideringContainers(std::vector& tsks); static std::string ComputePlacement(Task *zeTask); protected: static void *functionForTaskLoad(void *);