Executor::~Executor()
{
- for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
- delete *iter;
}
//! Execute a graph waiting for completion
if (debug > 2) _displayDot(graph);
{ // --- Critical section
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
- _tasks=graph->getNextTasks(isMore);
- graph->selectRunnableTasks(_tasks);
- FilterTasksConsideringContainers(_tasks);
+ std::vector<Task *> tasks = graph->getNextTasks(isMore);
+ graph->selectRunnableTasks(tasks);
+ filterTasksConsideringContainers(tasks);
+ _tasks = tasks;
numberAllTasks=_numberOfRunningTasks+_tasks.size();
} // --- End of critical section
if (debug > 2) _displayDot(graph);
DEBTRACE("---");
}
-//! not implemented
-
-void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
-{
- /*_mutexForNbOfConcurrentThreads.lock();
- _groupOfAllThreadsCreated.remove(thread);
- delete thread;
- _mutexForNbOfConcurrentThreads.unlock();*/
-}
-
//! must be used protected by _mutexForSchedulerUpdate!
_numberOfEndedTasks++;
}
-//! number of running tasks
+int Executor::getMaxNbOfThreads() const
+{
+ return (int)_maxNbThreads;
+}
+
+void Executor::setMaxNbOfThreads(int maxNbThreads)
+{
+ _maxNbThreads = static_cast< std::uint32_t >(maxNbThreads);
+}
int Executor::getNbOfThreads()
{
- int ret;
+ int ret = 0;
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForNbOfConcurrentThreads);
_isRunningunderExternalControl=true;
- ret = _groupOfAllThreadsCreated.size();
return ret;
}
+//! number of running tasks
+int Executor::getNumberOfRunningTasks()
+{
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _isRunningunderExternalControl=true;
+ return _numberOfRunningTasks;
+}
+
/*!
* This thread is NOT supposed to be detached !
*/
} // --- End of critical section (change state)
- //execInst->notifyEndOfThread(0);
Thread::exit(0);
return 0;
}
*
* \param [in,out] tsks - list of tasks to be
*/
-void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
+void Executor::filterTasksConsideringContainers(std::vector<Task *>& tsks)
{
std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
for(auto cur : tsks)
const std::vector<Task *>& curtsks(it.second);
if(!curhpc)
{
- ret.insert(ret.end(),curtsks.begin(),curtsks.end());
+ std::uint32_t nbThreadsRunning = _tasks.size();
+ std::uint32_t nbOfFreeSpace = _maxNbThreads - min(_maxNbThreads,nbThreadsRunning);
+ std::uint32_t nbOfCandidates = static_cast<std::uint32_t>( curtsks.size() );
+ std::uint32_t nbOfCandidatesToBeLaunched = std::min(nbOfCandidates,nbOfFreeSpace);
+ DEBTRACE("nb threads running: " << nbThreadsRunning);
+ DEBTRACE("MaxNbThreads: " << _maxNbThreads);
+ DEBTRACE("nbOfFreeSpace: " << nbOfFreeSpace);
+ DEBTRACE("nbOfCandidates: " << nbOfCandidates);
+ DEBTRACE("nbOfCandidatesToBeLaunched: " << nbOfCandidatesToBeLaunched);
+ ret.insert(ret.end(),curtsks.begin(),curtsks.begin() + nbOfCandidatesToBeLaunched);
}
else
{
if(_runningTasks.find(t) == _runningTasks.end())
_tasks.push_back(t);
// TODO: to be removed
- FilterTasksConsideringContainers(_tasks);
+ filterTasksConsideringContainers(_tasks);
numberAllTasks=_numberOfRunningTasks+_tasks.size();
} // --- End of critical section
if (debug > 2) _displayDot(graph);
#include <fstream>
#include <ctime>
#include <chrono>
+#include <cstdint>
namespace WorkloadManager
{
YACS::BASES::Condition _condForPilot;
YACS::BASES::Mutex _mutexForSchedulerUpdate;
YACS::BASES::Mutex _mutexForTrace;
+ std::uint32_t _maxNbThreads = 10000;
bool _toContinue;
bool _isOKToEnd;
bool _stopOnErrorRequested;
std::list<std::string> _listOfTasksToLoad;
std::vector<Task *> _tasks;
std::vector<Task *> _tasksSave;
- std::list< YACS::BASES::Thread * > _groupOfAllThreadsCreated;
std::ofstream _trace;
std::string _dumpErrorFile;
bool _keepGoingOnFail;
void stopExecution();
bool saveState(const std::string& xmlFile);
bool loadState();
+ int getMaxNbOfThreads() const;
+ void setMaxNbOfThreads(int maxNbThreads);
int getNbOfThreads();
- int getNumberOfRunningTasks() const { return _numberOfRunningTasks; }
+ int getNumberOfRunningTasks();
void displayDot(Scheduler *graph);
void setStopOnError(bool dumpRequested=false, std::string xmlFile="");
void unsetStopOnError();
void launchTask(Task *task);
void wakeUp();
void sleepWhileNoEventsFromAnyRunningTask();
- void notifyEndOfThread(YACS::BASES::Thread *thread);
void traceExec(Task *task, const std::string& message, const std::string& placement);
void _displayDot(Scheduler *graph);
virtual void sendEvent(const std::string& event);
- static void FilterTasksConsideringContainers(std::vector<Task *>& tsks);
+ void filterTasksConsideringContainers(std::vector<Task *>& tsks);
static std::string ComputePlacement(Task *zeTask);
protected:
static void *functionForTaskLoad(void *);