testSaveLoadRun fails because of side effects between test cases.
#include "ServiceNode.hxx"
#include "ComposedNode.hxx"
+#include "workloadmanager/WorkloadManager.hxx"
+#include "workloadmanager/DefaultAlgorithm.hxx"
+
#include <iostream>
#include <fstream>
#include <sys/stat.h>
}
///////// NEW EXECUTOR ////////////////////////////////
-void Executor::loadTask(Task *task)
+void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
{
if(task->getState() != YACS::TOLOAD)
return;
}//End of critical section
try
{
+ std::ostringstream container_name;
+ container_name << runInfo.resource.name << "-"
+ << runInfo.type.name << "-" << runInfo.index;
+ task->imposeResource(runInfo.resource.name, container_name.str());
traceExec(task, "load", ComputePlacement(task));
task->load();
traceExec(task, "initService", ComputePlacement(task));
traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
}
-#include "workloadmanager/WorkloadManager.hxx"
-#include "workloadmanager/DefaultAlgorithm.hxx"
#include "Runtime.hxx"
static
void loadResources(WorkloadManager::WorkloadManager& wm)
public:
NewTask(Executor& executor, YACS::ENGINE::Task* yacsTask);
const WorkloadManager::ContainerType& type()const override;
- void run(const WorkloadManager::Container& c)override;
+ void run(const WorkloadManager::RunInfo& runInfo)override;
private:
WorkloadManager::ContainerType _type;
Executor& _executor;
, _executor(executor)
, _yacsTask(yacsTask)
{
- _type.neededCores = 0;
+ Container * yacsContainer = yacsTask->getContainer();
+ if(yacsContainer != nullptr && !yacsTask->canAcceptImposedResource())
+ {
+ _type.ignoreResources = false;
+ _type.name = yacsContainer->getName();
+ _type.neededCores = 1; // TODO: use the actual value
+ }
+ else
+ {
+ _type.ignoreResources = true;
+ _type.name = "test";
+ _type.neededCores = 0;
+ }
_type.id = 0;
- _type.name = "test";
}
const WorkloadManager::ContainerType& NewTask::type()const
return _type;
}
-void NewTask::run(const WorkloadManager::Container& c)
+void NewTask::run(const WorkloadManager::RunInfo& runInfo)
{
- _executor.loadTask(_yacsTask);
+ _executor.loadTask(_yacsTask, runInfo);
_executor.makeDatastreamConnections(_yacsTask);
YACS::Event ev = _executor.runTask(_yacsTask);
_executor.endTask(_yacsTask, ev);
#include <ctime>
#include <chrono>
+namespace WorkloadManager
+{
+ struct RunInfo;
+}
+
namespace YACS
{
namespace ENGINE
static size_t _threadStackSize;
YACS::BASES::Mutex& getTheMutexForSchedulerUpdate() { return _mutexForSchedulerUpdate; }
///// new executor !!!!!
- void loadTask(Task *task);
+ void loadTask(Task *task, const WorkloadManager::RunInfo& runInfo);
YACS::Event runTask(Task *task);
void makeDatastreamConnections(Task *task);
void beginTask(Task *task);
#include "YACSlibEngineExport.hxx"
#include "define.hxx"
#include <set>
+#include <string>
namespace YACS
{
virtual YACS::StatesForNode getState() const = 0;
virtual void finished() = 0;
virtual void aborted() = 0;
+ virtual void imposeResource(const std::string& resource_name,
+ const std::string& container_name) {}
+ virtual bool canAcceptImposedResource() { return false;}
virtual ~Task();
};
}
itTask ++)
{
const ContainerType& ctype = (*itTask)->type();
- std::list<ResourceLoadInfo>::iterator best_resource;
- best_resource = _resources.end();
- float best_cost = std::numeric_limits<float>::max();
- bool isSupported = false;
- for(auto itResource = _resources.begin();
- itResource != _resources.end();
- itResource++)
- if(itResource->isSupported(ctype))
- {
- if(itResource->isAllocPossible(ctype))
+ if(ctype.ignoreResources)
+ result.taskFound = true;
+ else
+ {
+ std::list<ResourceLoadInfo>::iterator best_resource;
+ best_resource = _resources.end();
+ float best_cost = std::numeric_limits<float>::max();
+ bool isSupported = false;
+ for(auto itResource = _resources.begin();
+ itResource != _resources.end();
+ itResource++)
+ if(itResource->isSupported(ctype)
+ && (*itTask)->isAccepted(itResource->resource()))
{
- float thisCost = itResource->cost(ctype);
- if( best_cost > thisCost)
+ if(itResource->isAllocPossible(ctype))
{
- best_cost = thisCost;
- best_resource = itResource;
+ float thisCost = itResource->cost(ctype);
+ if( best_cost > thisCost)
+ {
+ best_cost = thisCost;
+ best_resource = itResource;
+ }
}
}
+ if(best_resource != _resources.end())
+ {
+ result.taskFound = true;
+ result.worker.resource = best_resource->resource();
+ result.worker.index = best_resource->alloc(ctype);
+ }
+ else if(!isSupported)
+ {
+ // TODO: This task can never be run by any available resource.
}
- if(best_resource != _resources.end())
+ }
+ if(result.taskFound)
{
chosenTaskIt = itTask;
result.task = (*itTask);
- result.taskFound = true;
- result.worker.resource = best_resource->resource();
result.worker.type = ctype;
- result.worker.index = best_resource->alloc(ctype);
- }
- else if(!isSupported)
- {
- // TODO: This task can never be run by any available resource.
}
}
if(result.taskFound)
void DefaultAlgorithm::liberate(const LaunchInfo& info)
{
- const Resource& r = info.worker.resource;
- unsigned int index = info.worker.index;
const ContainerType& ctype = info.worker.type;
- std::list<ResourceLoadInfo>::iterator it = std::find(_resources.begin(),
- _resources.end(),
- r);
- it->free(ctype, index); // we are sure to find it
+ if(!ctype.ignoreResources)
+ {
+ const Resource& r = info.worker.resource;
+ unsigned int index = info.worker.index;
+ std::list<ResourceLoadInfo>::iterator it = std::find(_resources.begin(),
+ _resources.end(),
+ r);
+ it->free(ctype, index); // we are sure to find it
+ }
}
// ResourceInfoForContainer
{
struct ContainerType
{
- float neededCores = 0.0; // needed by WorkloadManager
+ // parameters needed by WorkloadManager
+ float neededCores = 0.0;
+ bool ignoreResources = false; // if true, the task can be run as soon as
+ // added to the manager without any resource
+ // allocation
// parameters for client use, used by WorkloadManager to distinguish objects
std::string name;
int id = 0;
struct Resource
{
- unsigned int nbCores; // needed by WorkloadManager
+ unsigned int nbCores = 0; // needed by WorkloadManager
// parameters for client use, used by WorkloadManager to distinguish objects
std::string name;
- int id;
+ int id = 0;
bool operator==(const Resource& other)const
{ return id == other.id && name == other.name;}
bool operator<(const Resource& other)const
}
};
- struct Container
+ struct RunInfo
{
ContainerType type;
Resource resource;
public:
virtual ~Task(){};
virtual const ContainerType& type()const =0;
- virtual void run(const Container& c)=0;
+ virtual void run(const RunInfo& c)=0;
+
+ // Is it possible to run the task on this resource?
+ virtual bool isAccepted(const Resource& r)
+ {
+ // by default, a task can be run on any resource.
+ return true;
+ }
};
}
class AbstractChecker
{
public:
- virtual void check(const WorkloadManager::Container& c, MyTask* t)=0;
+ virtual void check(const WorkloadManager::RunInfo& c, MyTask* t)=0;
};
template <std::size_t size_R, std::size_t size_T>
{
public:
Checker();
- void check(const WorkloadManager::Container& c, MyTask* t)override;
+ void check(const WorkloadManager::RunInfo& c, MyTask* t)override;
void globalCheck();
void reset();
{
public:
const WorkloadManager::ContainerType& type()const override {return *_type;}
- void run(const WorkloadManager::Container& c)override
+ void run(const WorkloadManager::RunInfo& c)override
{
_check->check(c, this);
}
template <std::size_t size_R, std::size_t size_T>
-void Checker<size_R, size_T>::check(const WorkloadManager::Container& c,
+void Checker<size_R, size_T>::check(const WorkloadManager::RunInfo& c,
MyTask* t)
{
std::unique_lock<std::mutex> lock(_mutex);
{
CPPUNIT_TEST_SUITE(MyTest);
CPPUNIT_TEST(atest);
+ CPPUNIT_TEST(btest);
CPPUNIT_TEST_SUITE_END();
public:
void atest();
+ void btest(); // ignore resources
};
void MyTest::atest()
}
+void MyTest::btest()
+{
+ Checker<1, 1> check;
+ WorkloadManager::ContainerType ctype;
+ ctype.ignoreResources = true;
+ constexpr std::size_t tasksNumber = 20;
+ MyTask tasks[tasksNumber];
+ for(std::size_t i = 0; i < tasksNumber; i++)
+ tasks[i].reset(i, &ctype, 1, &check);
+ WorkloadManager::DefaultAlgorithm algo;
+ WorkloadManager::WorkloadManager wlm(algo);
+ // no resource needed
+ std::chrono::steady_clock::time_point start_time;
+ std::chrono::steady_clock::time_point end_time;
+ std::chrono::seconds duration;
+ start_time = std::chrono::steady_clock::now();
+ wlm.start();
+ for(std::size_t i = 0; i < tasksNumber; i++)
+ wlm.addTask(&tasks[i]);
+ wlm.stop();
+ end_time = std::chrono::steady_clock::now();
+ duration = std::chrono::duration_cast<std::chrono::seconds>
+ (end_time - start_time);
+ std::chrono::seconds maxExpectedDuration(2);
+ CPPUNIT_ASSERT( duration <= maxExpectedDuration);
+}
+
CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
#include "BasicMainTest.hxx"
struct LaunchInfo
{
bool taskFound=false;
- Container worker;
+ RunInfo worker;
Task* task=nullptr;
};
//
#include "WorkloadManager.hxx"
#include "Task.hxx"
-//#include "Container.hxx"
namespace WorkloadManager
{
ex.RunW(p,0)
self.assertEqual(p.getState(),pilot.DONE)
+ self.skipTest("Skip HPContainertest")
# run remote
p=l.load(fname)
pg=pilot.PlayGround()