# List of targets in this project we want to make visible to the rest of the world.
# They all have to be INSTALL'd with the option "EXPORT ${PROJECT_NAME}TargetGroup"
SET(_${PROJECT_NAME}_exposed_targets
- YACSBases YACSlibEngine
+ YACSBases YACSlibEngine YACSlibWorkloadmanager
)
IF(SALOME_YACS_USE_KERNEL)
INCLUDE_DIRECTORIES(
${PTHREAD_INCLUDE_DIR}
${PROJECT_SOURCE_DIR}/src/bases
+ ${PROJECT_SOURCE_DIR}/src
)
# libraries to link to
SET(_link_LIBRARIES
${PTHREAD_LIBRARIES}
YACSBases
+ YACSlibWorkloadmanager
)
# --- headers ---
string tracefile = "traceExec_";
tracefile += _mainSched->getName();
_trace.open(tracefile.c_str());
-#ifdef WIN32
- _start = timeGetTime();
-#else
- gettimeofday(&_start, NULL);
-#endif
-
+ _start = std::chrono::steady_clock::now();
} // --- End of critical section
if (debug > 1) _displayDot(graph);
if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
if (debug > 0) _displayDot(graph);
DEBTRACE("---");
- //loadTasks(_tasks);
loadParallelTasks(_tasks,this);
if (debug > 1) _displayDot(graph);
DEBTRACE("---");
Executor *execInst;
};
-void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
-{
- for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
- loadTask(*iter,execInst);
-}
-
void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
{
std::vector<Thread> ths(tasks.size());
if (cont)
containerName = cont->getName();
-#ifdef WIN32
- DWORD now = timeGetTime();
- double elapse = (now - _start)/1000.0;
-#else
- timeval now;
- gettimeofday(&now, NULL);
- double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
-#endif
+ std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
+ std::chrono::milliseconds millisec;
+ millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
+ double elapse = double(millisec.count()) / 1000.0;
{
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
_trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
placement=zeTask->getContainer()->getFullPlacementId(zeTask);
return placement;
}
+
+///////// NEW EXECUTOR ////////////////////////////////
+void Executor::loadTask(Task *task)
+{
+ if(task->getState() != YACS::TOLOAD)
+ return;
+ traceExec(task, "state:TOLOAD", ComputePlacement(task));
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched->notifyFrom(task,YACS::START,this);
+ }//End of critical section
+ try
+ {
+ traceExec(task, "load", ComputePlacement(task));
+ task->load();
+ traceExec(task, "initService", ComputePlacement(task));
+ task->initService();
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT, this);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+ }//End of critical section
+ }
+ catch(...)
+ {
+ std::cerr << "Load failed" << std::endl;
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT, this);
+ traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
+ }//End of critical section
+ }
+}
+
+void Executor::beginTask(Task *task)
+{
+ // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _numberOfRunningTasks++;
+ _runningTasks.insert(task);
+ // --- End of critical section
+}
+
+void Executor::endTask(Task *task, YACS::Event ev)
+{
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ try
+ {
+ if (ev == YACS::FINISH) task->finished();
+ if (ev == YACS::ABORT)
+ {
+ _errorDetected = true;
+ if (_stopOnErrorRequested)
+ {
+ _execMode = YACS::STEPBYSTEP;
+ _isOKToEnd = true;
+ }
+ task->aborted();
+ }
+ //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
+ _mainSched->notifyFrom(task,ev,this);
+ }
+ catch(Exception& ex)
+ {
+ //notify has failed : it is supposed to have set state
+ //so no need to do anything
+ std::cerr << "Error during notification" << std::endl;
+ std::cerr << ex.what() << std::endl;
+ }
+ catch(...)
+ {
+ //notify has failed : it is supposed to have set state
+ //so no need to do anything
+ std::cerr << "Notification failed" << std::endl;
+ }
+ _numberOfRunningTasks--;
+ _runningTasks.erase(task);
+ DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks
+ << " _execMode: " << _execMode
+ << " _executorState: " << _executorState);
+ if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
+ {
+ if (_executorState == YACS::WAITINGTASKS)
+ {
+ _executorState = YACS::PAUSED;
+ sendEvent("executor");
+ _condForPilot.notify_all();
+ if (_errorDetected &&
+ _stopOnErrorRequested &&
+ !_isRunningunderExternalControl)
+ _condForStepByStep.notify_all(); // exec thread may be on waitResume
+ }
+ }
+ if (_executorState != YACS::PAUSED)
+ wakeUp();
+}
+
+YACS::Event Executor::runTask(Task *task)
+{
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->begin(); //change state to ACTIVATED
+ }
+ traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+
+ if(getDPLScopeSensitive())
+ {
+ Node *node(dynamic_cast<Node *>(task));
+ ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
+ if(node!=0 && gfn!=0)
+ node->applyDPLScope(gfn);
+ }
+
+ YACS::Event ev=YACS::FINISH;
+ try
+ {
+ traceExec(task, "start execution",ComputePlacement(task));
+ task->execute();
+ traceExec(task, "end execution OK",ComputePlacement(task));
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << "YACS Exception during execute" << std::endl;
+ std::cerr << ex.what() << std::endl;
+ ev=YACS::ABORT;
+ string message = "end execution ABORT, ";
+ message += ex.what();
+ traceExec(task, message,ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Execution has failed
+ std::cerr << "Execution has failed: unknown reason" << std::endl;
+ ev=YACS::ABORT;
+ traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
+ }
+
+ // Disconnect task
+ try
+ {
+ DEBTRACE("task->disconnectService()");
+ task->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ std::cerr << "disconnect has failed" << std::endl;
+ ev=YACS::ABORT;
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ //
+
+ std::string placement(ComputePlacement(task));
+
+ // container management for HomogeneousPoolOfContainer
+
+ HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
+ if(contC)
+ {
+ std::lock_guard<std::mutex> alckCont(contC->getLocker());
+ contC->release(task);
+ }
+
+ return ev;
+}
+
+void Executor::makeDatastreamConnections(Task *task)
+{
+ YACS::StatesForNode state=task->getState();
+ if(state != YACS::TOLOAD && state != YACS::TORECONNECT)
+ return;
+ try
+ {
+ task->connectService();
+ traceExec(task, "connectService",ComputePlacement(task));
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->connected();
+ }//End of critical section
+ }
+ catch(Exception& ex)
+ {
+ std::cerr << ex.what() << std::endl;
+ try
+ {
+ (task)->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT,this);
+ }//End of critical section
+ }
+ catch(...)
+ {
+ std::cerr << "Problem in connectService" << std::endl;
+ try
+ {
+ (task)->disconnectService();
+ traceExec(task, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ task->aborted();
+ _mainSched->notifyFrom(task,YACS::ABORT,this);
+ }//End of critical section
+ }
+ if(task->getState() == YACS::ERROR)
+ {
+ //try to put all coupled tasks in error
+ std::set<Task*> coupledSet;
+ task->getCoupledTasks(coupledSet);
+ for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
+ {
+ Task* t=*it;
+ if(t == task)continue;
+ if(t->getState() == YACS::ERROR)continue;
+ try
+ {
+ t->disconnectService();
+ traceExec(t, "disconnectService",ComputePlacement(task));
+ }
+ catch(...)
+ {
+ // Disconnect has failed
+ traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
+ }
+ {//Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ t->aborted();
+ _mainSched->notifyFrom(t,YACS::ABORT,this);
+ }//End of critical section
+ traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
+ }
+ }
+ traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
+}
+
+#include "workloadmanager/WorkloadManager.hxx"
+#include "workloadmanager/DefaultAlgorithm.hxx"
+#include "Runtime.hxx"
+static
+void loadResources(WorkloadManager::WorkloadManager& wm)
+{
+ Runtime *r(getRuntime());
+ if(!r)
+ throw YACS::Exception("loadResources : no runtime !");
+ std::vector< std::pair<std::string,int> > data(r->getCatalogOfComputeNodes());
+ int id = 0;
+ for(const std::pair<std::string,int>& res : data)
+ {
+ WorkloadManager::Resource newResource;
+ newResource.name = res.first;
+ newResource.id = id;
+ id++;
+ newResource.nbCores = res.second;
+ wm.addResource(newResource);
+ }
+}
+
+class NewTask : public WorkloadManager::Task
+{
+public:
+ NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
+ const WorkloadManager::ContainerType& type()const override;
+ void run(const WorkloadManager::Container& c)override;
+private:
+ WorkloadManager::ContainerType _type;
+ Executor& _executor;
+ YACS::ENGINE::Task * _yacsTask;
+};
+
+NewTask::NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask)
+: _type()
+, _executor(executor)
+, _yacsTask(yacsTask)
+{
+ _type.neededCores = 0;
+ _type.id = 0;
+ _type.name = "test";
+}
+
+const WorkloadManager::ContainerType& NewTask::type()const
+{
+ return _type;
+}
+
+void NewTask::run(const WorkloadManager::Container& c)
+{
+ _executor.loadTask(_yacsTask);
+ _executor.makeDatastreamConnections(_yacsTask);
+ YACS::Event ev = _executor.runTask(_yacsTask);
+ _executor.endTask(_yacsTask, ev);
+ delete this; // provisoire
+}
+
+void Executor::newRun(Scheduler *graph,int debug, bool fromScratch)
+{
+ DEBTRACE("Executor::newRun debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ _mainSched = graph;
+ _root = dynamic_cast<ComposedNode *>(_mainSched);
+ if (!_root) throw Exception("Executor::Run, Internal Error!");
+ _executorState = YACS::NOTYETINITIALIZED;
+ sendEvent("executor");
+ _toContinue=true;
+ _isOKToEnd = false;
+ _errorDetected = false;
+ _isWaitingEventsFromRunningTasks = false;
+ _numberOfRunningTasks = 0;
+ _runningTasks.clear();
+ _numberOfEndedTasks = 0;
+ string tracefile = "traceExec_";
+ tracefile += _mainSched->getName();
+ _trace.open(tracefile.c_str());
+ _start = std::chrono::steady_clock::now();
+ } // --- End of critical section
+
+ if (debug > 1) _displayDot(graph);
+
+ if (fromScratch)
+ {
+ try
+ {
+ graph->init();
+ graph->exUpdateState();
+ }
+ catch(Exception& ex)
+ {
+ DEBTRACE("exception: "<< (ex.what()));
+ _executorState = YACS::FINISHED;
+ sendEvent("executor");
+ throw;
+ }
+ }
+ _executorState = YACS::INITIALISED;
+ sendEvent("executor");
+
+ if (debug > 1) _displayDot(graph);
+
+ bool isMore;
+ int problemCount=0;
+ int numberAllTasks;
+
+ _executorState = YACS::RUNNING;
+ sendEvent("executor");
+
+ WorkloadManager::DefaultAlgorithm algo;
+ WorkloadManager::WorkloadManager wlm(algo);
+ loadResources(wlm);
+ wlm.start();
+
+ while (_toContinue)
+ {
+ DEBTRACE("--- executor main loop");
+ sleepWhileNoEventsFromAnyRunningTask();
+ DEBTRACE("--- events...");
+ if (debug > 2) _displayDot(graph);
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
+ graph->selectRunnableTasks(readyTasks);
+ _tasks.clear();
+ for(Task * t : readyTasks)
+ if(_runningTasks.find(t) == _runningTasks.end())
+ _tasks.push_back(t);
+ numberAllTasks=_numberOfRunningTasks+_tasks.size();
+ } // --- End of critical section
+ if (debug > 2) _displayDot(graph);
+ DEBTRACE("--- events...");
+ if (_executorState == YACS::RUNNING)
+ {
+ if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
+ for(Task * task : _tasks)
+ {
+ beginTask(task);
+ NewTask* newTask = new NewTask(*this, task);
+ wlm.addTask(newTask);
+ }
+ }
+ if (debug > 1) _displayDot(graph);
+ { // --- Critical section
+ DEBTRACE("---");
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
+ _toContinue = !graph->isFinished();
+
+ DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
+ DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
+ DEBTRACE("_toContinue: " << _toContinue);
+ if(_toContinue && numberAllTasks==0)
+ {
+ //Problem : no running tasks and no task to launch ??
+ problemCount++;
+ std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
+ //Pause to give a chance to interrupt
+ usleep(1000);
+ if(problemCount > 25)
+ {
+ // Too much problems encountered : stop execution
+ _toContinue=false;
+ }
+ }
+
+ if (! _toContinue)
+ {
+ _executorState = YACS::FINISHED;
+ sendEvent("executor");
+ _condForPilot.notify_all();
+ }
+ } // --- End of critical section
+ if (debug > 0) _displayDot(graph);
+ DEBTRACE("_toContinue: " << _toContinue);
+ }
+
+ wlm.stop();
+ DEBTRACE("End of main Loop");
+
+ { // --- Critical section
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForSchedulerUpdate);
+ if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
+ {
+ DEBTRACE("stop requested: End soon");
+ _executorState = YACS::STOPPED;
+ _toContinue = false;
+ sendEvent("executor");
+ }
+ } // --- End of critical section
+ if ( _dumpOnErrorRequested && _errorDetected)
+ {
+ saveState(_dumpErrorFile);
+ }
+ {
+ YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&_mutexForTrace);
+ _trace.close();
+ }
+ DEBTRACE("End of RunB thread");
+}
#include <string>
#include <fstream>
#include <ctime>
+#include <chrono>
namespace YACS
{
bool _keepGoingOnFail;
//! specifies if scope DynParaLoop is active or not. False by default.
bool _DPLScopeSensitive;
-#ifdef WIN32
- DWORD _start;
-#else
- timeval _start;
-#endif
+ std::chrono::steady_clock::time_point _start;
public:
Executor();
virtual ~Executor();
void RunA(Scheduler *graph,int debug=0, bool fromScratch=true);
- void RunW(Scheduler *graph,int debug=0, bool fromScratch=true) { RunB(graph, debug, fromScratch); }
+ void RunW(Scheduler *graph,int debug=0, bool fromScratch=true)
+ {
+ //RunB(graph, debug, fromScratch);
+ newRun(graph, debug, fromScratch);
+ }
void RunB(Scheduler *graph,int debug=0, bool fromScratch=true);
+ void newRun(Scheduler *graph,int debug=0, bool fromScratch=true);
void setKeepGoingProperty(bool newVal) { _keepGoingOnFail=newVal; }
bool getKeepGoingProperty() const { return _keepGoingOnFail; }
void setDPLScopeSensitive(bool newVal) { _DPLScopeSensitive=newVal; }
static int _maxThreads;
static size_t _threadStackSize;
YACS::BASES::Mutex& getTheMutexForSchedulerUpdate() { return _mutexForSchedulerUpdate; }
+ ///// new executor !!!!!
+ void loadTask(Task *task);
+ YACS::Event runTask(Task *task);
+ void makeDatastreamConnections(Task *task);
+ void beginTask(Task *task);
+ void endTask(Task *task, YACS::Event ev);
+ ////////////
protected:
bool checkBreakPoints();
void waitResume();
void loadTask(Task *task, const Executor *execInst);
- void loadTasks(const std::vector<Task *>& tasks, const Executor *execInst);
void loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst);
void launchTasks(const std::vector<Task*>& tasks);
void launchTask(Task *task);
std::vector< std::pair<std::string,int> > RuntimeForEngineIntegrationTest::getCatalogOfComputeNodes() const
{
- throw Exception("RuntimeForEngineIntegrationTest not implemented !");
+ std::vector< std::pair<std::string,int> > result(1);
+ std::pair<std::string,int> localhost;
+ localhost.first = "localhost";
+ localhost.second = 8;
+ result[0] = localhost;
+ return result;
+// throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !");
}
ElementaryNode* RuntimeForEngineIntegrationTest::createNode(const std::string& implementation, const std::string& name) throw (YACS::Exception)
std::vector< std::pair<std::string,int> > RuntimeForEngineTest::getCatalogOfComputeNodes() const
{
- throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !");
+ std::vector< std::pair<std::string,int> > result(1);
+ std::pair<std::string,int> localhost;
+ localhost.first = "localhost";
+ localhost.second = 8;
+ result[0] = localhost;
+ return result;
+// throw Exception("RuntimeForEngineTest::getCatalogOfComputeNodes : not implemented !");
}
ElementaryNode* RuntimeForEngineTest::createNode(const string& implementation, const string& name) throw(YACS::Exception)
ADD_LIBRARY(YACSlibWorkloadmanager ${_wlm_sources})
TARGET_LINK_LIBRARIES(YACSlibWorkloadmanager ${_link_LIBRARIES})
INSTALL(TARGETS YACSlibWorkloadmanager EXPORT ${PROJECT_NAME}TargetGroup DESTINATION ${SALOME_INSTALL_LIBS})
-INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS})
+INSTALL(FILES ${_wlm_headers} DESTINATION ${SALOME_INSTALL_HEADERS}/workloadmanager)
IF(SALOME_BUILD_TESTS)
ADD_SUBDIRECTORY(Test)
#include "Task.hxx"
#include <stdexcept>
#include <limits>
+#include <algorithm>
namespace WorkloadManager
{
void DefaultAlgorithm::addTask(Task* t)
{
// put the tasks which need more cores in front.
- float newNeedCores = t->type()->neededCores;
+ float newNeedCores = t->type().neededCores;
if(_waitingTasks.empty())
_waitingTasks.push_back(t);
- else if(_waitingTasks.back()->type()->neededCores >= newNeedCores)
+ else if(_waitingTasks.back()->type().neededCores >= newNeedCores)
_waitingTasks.push_back(t);
else
{
std::list<Task*>::iterator it = _waitingTasks.begin();
- while(it != _waitingTasks.end() && (*it)->type()->neededCores >= newNeedCores)
+ while(it != _waitingTasks.end() && (*it)->type().neededCores >= newNeedCores)
it++;
_waitingTasks.insert(it, t);
}
return _waitingTasks.empty();
}
-void DefaultAlgorithm::addResource(Resource* r)
+void DefaultAlgorithm::addResource(const Resource& r)
{
- // add the resource. The operation is ignored if the resource already exists.
- _resources.emplace(std::piecewise_construct,
- std::forward_as_tuple(r),
- std::forward_as_tuple(r)
- );
+ _resources.emplace_back(r);
}
WorkloadAlgorithm::LaunchInfo DefaultAlgorithm::chooseTask()
!result.taskFound && itTask != _waitingTasks.end();
itTask ++)
{
- const ContainerType* ctype = (*itTask)->type();
- std::map<const Resource *, ResourceLoadInfo>::iterator best_resource;
+ const ContainerType& ctype = (*itTask)->type();
+ std::list<ResourceLoadInfo>::iterator best_resource;
best_resource = _resources.end();
float best_cost = std::numeric_limits<float>::max();
+ bool isSupported = false;
for(auto itResource = _resources.begin();
itResource != _resources.end();
itResource++)
- if(itResource->second.isSupported(ctype))
+ if(itResource->isSupported(ctype))
{
- if(itResource->second.isAllocPossible(ctype))
+ if(itResource->isAllocPossible(ctype))
{
- float thisCost = itResource->second.cost(ctype);
+ float thisCost = itResource->cost(ctype);
if( best_cost > thisCost)
{
best_cost = thisCost;
chosenTaskIt = itTask;
result.task = (*itTask);
result.taskFound = true;
- result.worker.resource = best_resource->first;
+ result.worker.resource = best_resource->resource();
result.worker.type = ctype;
- result.worker.index = best_resource->second.alloc(ctype);
+ result.worker.index = best_resource->alloc(ctype);
+ }
+ else if(!isSupported)
+ {
+ // TODO: This task can never be run by any available resource.
}
}
if(result.taskFound)
void DefaultAlgorithm::liberate(const LaunchInfo& info)
{
- const Resource* r = info.worker.resource;
+ const Resource& r = info.worker.resource;
unsigned int index = info.worker.index;
- const ContainerType* ctype = info.worker.type;
- std::map<const Resource* ,ResourceLoadInfo>::iterator it = _resources.find(r);
- it->second.free(ctype, index);
+ const ContainerType& ctype = info.worker.type;
+ std::list<ResourceLoadInfo>::iterator it = std::find(_resources.begin(),
+ _resources.end(),
+ r);
+ it->free(ctype, index); // we are sure to find it
}
// ResourceInfoForContainer
DefaultAlgorithm::ResourceInfoForContainer::ResourceInfoForContainer
- (const Resource * r, const ContainerType* ctype)
+ (const Resource& r, const ContainerType& ctype)
: _ctype(ctype)
, _resource(r)
, _runningContainers()
unsigned int DefaultAlgorithm::ResourceInfoForContainer::maxContainers()const
{
- return float(_resource->nbCores) / _ctype->neededCores;
+ return float(_resource.nbCores) / _ctype.neededCores;
}
unsigned int DefaultAlgorithm::ResourceInfoForContainer::alloc()
// ResourceLoadInfo
-DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource * r)
+DefaultAlgorithm::ResourceLoadInfo::ResourceLoadInfo(const Resource& r)
: _resource(r)
, _load(0.0)
, _ctypes()
}
bool DefaultAlgorithm::ResourceLoadInfo::isSupported
- (const ContainerType* ctype)const
+ (const ContainerType& ctype)const
{
- return ctype->neededCores <= _resource->nbCores ;
+ return ctype.neededCores <= _resource.nbCores ;
}
bool DefaultAlgorithm::ResourceLoadInfo::isAllocPossible
- (const ContainerType* ctype)const
+ (const ContainerType& ctype)const
{
- return ctype->neededCores + _load <= _resource->nbCores;
+ return ctype.neededCores + _load <= _resource.nbCores;
}
float DefaultAlgorithm::ResourceLoadInfo::cost
- (const ContainerType* ctype)const
+ (const ContainerType& ctype)const
{
- return _load * 100.0 / float(_resource->nbCores);
+ return _load * 100.0 / float(_resource.nbCores);
}
unsigned int DefaultAlgorithm::ResourceLoadInfo::alloc
- (const ContainerType* ctype)
+ (const ContainerType& ctype)
{
- std::map<const ContainerType*, ResourceInfoForContainer>::iterator it;
- it = _ctypes.find(ctype);
+ std::list<ResourceInfoForContainer>::iterator it = std::find(_ctypes.begin(),
+ _ctypes.end(),
+ ctype);
+ // add the type if not found
if(it == _ctypes.end())
{
- // add the type if not found
- it = _ctypes.emplace(std::piecewise_construct,
- std::forward_as_tuple(ctype),
- std::forward_as_tuple(_resource, ctype)
- ).first;
+ _ctypes.emplace_back(_resource, ctype);
+ it = _ctypes.end();
+ it--;
}
- _load += ctype->neededCores;
- return it->second.alloc();
+ _load += ctype.neededCores;
+ return it->alloc();
}
void DefaultAlgorithm::ResourceLoadInfo::free
- (const ContainerType* ctype, int index)
+ (const ContainerType& ctype, int index)
{
- _load -= ctype->neededCores;
- std::map<const ContainerType*, ResourceInfoForContainer>::iterator it;
- it = _ctypes.find(ctype);
- it->second.free(index);
+ _load -= ctype.neededCores;
+ std::list<ResourceInfoForContainer>::iterator it = std::find(_ctypes.begin(),
+ _ctypes.end(),
+ ctype);
+ it->free(index);
}
}
{
public:
void addTask(Task* t)override;
- void addResource(Resource* r)override;
+ void addResource(const Resource& r)override;
LaunchInfo chooseTask()override;
void liberate(const LaunchInfo& info)override;
bool empty()const override;
class ResourceInfoForContainer
{
public:
- ResourceInfoForContainer(const Resource * r, const ContainerType* ctype);
+ ResourceInfoForContainer(const Resource& r, const ContainerType& ctype);
unsigned int maxContainers()const;
unsigned int alloc();
void free(unsigned int index);
unsigned int nbRunningContainers()const;
bool isContainerRunning(unsigned int index)const;
+ bool operator<(const ResourceInfoForContainer& other)const
+ { return _ctype < other._ctype;}
+ bool operator==(const ContainerType& other)const
+ { return _ctype == other;}
+ const ContainerType& type()const { return _ctype;}
private:
- const ContainerType* _ctype;
- const Resource* _resource;
- std::set<unsigned int> _runningContainers;
+ ContainerType _ctype;
+ const Resource& _resource; // same ref as ResourceLoadInfo
+ std::set<unsigned int> _runningContainers; // 0 to max possible containers on this resource
unsigned int _firstFreeContainer;
};
class ResourceLoadInfo
{
public:
- ResourceLoadInfo(const Resource * r);
- bool isSupported(const ContainerType* ctype)const;
- bool isAllocPossible(const ContainerType* ctype)const;
- float cost(const ContainerType* ctype)const;
- unsigned int alloc(const ContainerType* ctype);
- void free(const ContainerType* ctype, int index);
+ ResourceLoadInfo(const Resource& r);
+ bool isSupported(const ContainerType& ctype)const;
+ bool isAllocPossible(const ContainerType& ctype)const;
+ float cost(const ContainerType& ctype)const;
+ unsigned int alloc(const ContainerType& ctype);
+ void free(const ContainerType& ctype, int index);
+ bool operator<(const ResourceLoadInfo& other)const
+ { return _resource < other._resource;}
+ bool operator==(const Resource& other)const
+ { return _resource == other;}
+ const Resource& resource()const { return _resource;}
private:
- const Resource* _resource;
+ Resource _resource;
float _load;
- std::map<const ContainerType*, ResourceInfoForContainer> _ctypes;
+ std::list<ResourceInfoForContainer> _ctypes;
};
private:
- std::map<const Resource *, ResourceLoadInfo> _resources;
+ std::list<ResourceLoadInfo> _resources;
std::list<Task*> _waitingTasks;
};
}
{
struct ContainerType
{
- float neededCores; // needed by WorkloadManager
- // parameters for client use, not needed by WorkloadManager:
+ float neededCores = 0.0; // needed by WorkloadManager
+ // parameters for client use, used by WorkloadManager to distinguish objects
std::string name;
- int id;
+ int id = 0;
+ bool operator==(const ContainerType& other)const
+ { return id == other.id && name == other.name;}
+ bool operator<(const ContainerType& other)const
+ {
+ return (id < other.id) ||
+ (id == other.id && name < other.name);
+ }
};
struct Resource
{
unsigned int nbCores; // needed by WorkloadManager
- // parameters for client use, not needed by WorkloadManager:
+ // parameters for client use, used by WorkloadManager to distinguish objects
std::string name;
int id;
+ bool operator==(const Resource& other)const
+ { return id == other.id && name == other.name;}
+ bool operator<(const Resource& other)const
+ {
+ return (id < other.id) ||
+ (id == other.id && name < other.name);
+ }
};
struct Container
{
- const ContainerType* type;
- const Resource* resource;
+ ContainerType type;
+ Resource resource;
unsigned int index; // worker index on the resource for this type
};
class Task
{
public:
- virtual const ContainerType* type()const =0;
+ virtual ~Task(){};
+ virtual const ContainerType& type()const =0;
virtual void run(const Container& c)=0;
};
}
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
//
-#include <cppunit/CompilerOutputter.h>
-#include <cppunit/TestResult.h>
-#include <cppunit/TestResultCollector.h>
-#include <cppunit/TextTestProgressListener.h>
-#include <cppunit/BriefTestProgressListener.h>
-#include <cppunit/extensions/TestFactoryRegistry.h>
-#include <cppunit/TestRunner.h>
-#include <cppunit/TextTestRunner.h>
+
+#include <cppunit/TestFixture.h>
#include <stdexcept>
#include <iostream>
class MyTask : public WorkloadManager::Task
{
public:
- const WorkloadManager::ContainerType* type()const override {return _type;}
+ const WorkloadManager::ContainerType& type()const override {return *_type;}
void run(const WorkloadManager::Container& c)override
{
_check->check(c, this);
- DEBUG_LOG("Running task ", _id, " on ", c.resource->name, "-", c.type->name,
+ DEBUG_LOG("Running task ", _id, " on ", c.resource.name, "-", c.type.name,
"-", c.index);
std::this_thread::sleep_for(std::chrono::seconds(_sleep));
DEBUG_LOG("Finish task ", _id);
MyTask* t)
{
std::unique_lock<std::mutex> lock(_mutex);
- int& max = _maxContainersForResource[c.resource->id][c.type->id];
+ int& max = _maxContainersForResource[c.resource.id][c.type.id];
if( max < c.index)
max = c.index;
}
for(std::size_t j=0; j < size_T; j++)
{
int max = _maxContainersForResource[i][j];
- DEBUG_LOG(resources[i].name, ", ", types[j].name, ":", max+1);
+ DEBUG_LOG(resources[i].name, ", ", types[j].name,
+ " max simultaneous runs:", max+1);
CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
global_max += types[j].neededCores * float(max+1);
}
- DEBUG_LOG(resources[i].name, " global: ", global_max);
+ DEBUG_LOG(resources[i].name, " max cores added for evry type: ", global_max);
+ // This assertion may be false if there are more resources than needed.
CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
}
}
void MyTest::atest()
{
constexpr std::size_t resourcesNumber = 2;
- constexpr std::size_t typesNumber = 2;
+ constexpr std::size_t typesNumber = 3;
Checker<resourcesNumber, typesNumber> check;
check.resources[0].nbCores = 10;
check.resources[1].nbCores = 18;
check.types[0].neededCores = 4.0;
check.types[1].neededCores = 1.0;
+ check.types[2].neededCores = 0.0; // tasks to be run with no cost
for(std::size_t i=0; i < resourcesNumber; i ++)
DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
" cores.");
- constexpr std::size_t tasksNumber = 100;
+ constexpr std::size_t tasksNumber = 150;
MyTask tasks[tasksNumber];
- for(std::size_t i = 0; i < tasksNumber / 2; i++)
- tasks[i].reset(i, &check.types[0], 2, &check);
- for(std::size_t i = tasksNumber / 2; i < tasksNumber; i++)
- tasks[i].reset(i, &check.types[1], 1, &check);
+ for(int type_id = 0; type_id < typesNumber; type_id++)
+ for(int j = type_id * tasksNumber / typesNumber;
+ j < (type_id + 1) * tasksNumber / typesNumber;
+ j++)
+ // id, ContainerType, sleep (1|2s)
+ tasks[j].reset(j, &check.types[type_id], 2-type_id%2, &check);
DEBUG_LOG("Number of tasks: ", tasksNumber);
- DEBUG_LOG("Tasks from 0 to ", tasksNumber/2, " are ", tasks[0].type()->name);
- DEBUG_LOG("Tasks from ", tasksNumber/2, " to ", tasksNumber, " are ",
- tasks[tasksNumber / 2].type()->name);
+ for(int type_id = 0; type_id < typesNumber; type_id++)
+ DEBUG_LOG("Tasks from ", type_id * tasksNumber / typesNumber,
+ " to ", (type_id + 1) * tasksNumber / typesNumber,
+ " are of type ", check.types[type_id].name);
WorkloadManager::DefaultAlgorithm algo;
WorkloadManager::WorkloadManager wlm(algo);
for(std::size_t i=0; i < resourcesNumber; i ++)
- wlm.addResource(&check.resources[i]);
+ wlm.addResource(check.resources[i]);
// Add 4 core tasks first
check.reset();
};
virtual void addTask(Task* t)=0;
- virtual void addResource(Resource* r)=0;
+ virtual void addResource(const Resource& r)=0;
virtual LaunchInfo chooseTask()=0;
virtual void liberate(const LaunchInfo& info)=0;
virtual bool empty()const =0;
stop();
}
- void WorkloadManager::addResource(Resource* r)
+ void WorkloadManager::addResource(const Resource& r)
{
std::unique_lock<std::mutex> lock(_data_mutex);
_algo.addResource(r);
}
_startCondition.notify_one();
_endCondition.notify_one();
- for(std::future<void>& th : _otherThreads)
- th.wait();
+ for(std::future<void>& th : _otherThreads)
+ th.wait();
}
void WorkloadManager::runTasks()
while(!threadStop)
{
std::unique_lock<std::mutex> lock(_data_mutex);
- _startCondition.wait(lock, [this] {return !_algo.empty();});
+ _startCondition.wait(lock, [this] {return !_algo.empty() || _stop;});
RunningInfo taskInfo;
while(chooseTaskToRun(taskInfo))
{
while(!threadStop)
{
std::unique_lock<std::mutex> lock(_data_mutex);
- _endCondition.wait(lock, [this] {return !_finishedTasks.empty();});
+ _endCondition.wait(lock, [this]
+ {
+ return !_finishedTasks.empty() ||
+ (_stop && _runningTasks.empty() && _algo.empty());
+ });
while(!_finishedTasks.empty())
{
RunningInfo taskInfo = _finishedTasks.front();
{
// We are already under the lock
taskInfo.id = _nextIndex;
- _nextIndex ++;
taskInfo.info = _algo.chooseTask();
+ if(taskInfo.info.taskFound)
+ _nextIndex ++;
return taskInfo.info.taskFound;
}
{
public:
WorkloadManager(WorkloadAlgorithm& algo);
+ WorkloadManager(const WorkloadManager&) = delete;
+ WorkloadManager()=delete;
~WorkloadManager();
void addTask(Task* t);
- void addResource(Resource* r);
+ void addResource(const Resource& r);
void start(); //! start execution
void stop(); //! stop execution
std::condition_variable _startCondition; // start tasks thread notification
std::condition_variable _endCondition; // end tasks thread notification
bool _stop;
- std::list< std::future<void> > _otherThreads;
+ std::vector< std::future<void> > _otherThreads;
WorkloadAlgorithm& _algo;
void runTasks();