wakeUp();
}
+void Executor::failTask(Task *task, const std::string& message)
+{
+ ElementaryNode* elemNode = dynamic_cast<ElementaryNode*>(task);
+ if(elemNode != nullptr)
+ {
+ StateLoader(elemNode, YACS::ERROR);
+ elemNode->setErrorDetails(message);
+ }
+ endTask(task, YACS::ABORT);
+}
+
YACS::Event Executor::runTask(Task *task)
{
{ // --- Critical section
void makeDatastreamConnections(Task *task);
void beginTask(Task *task);
void endTask(Task *task, YACS::Event ev);
+ void failTask(Task *task, const std::string& message);
////////////
protected:
bool checkBreakPoints();
void WlmTask::run(const WorkloadManager::RunInfo& runInfo)
{
- _executor.loadTask(_yacsTask, runInfo);
- _executor.makeDatastreamConnections(_yacsTask);
- YACS::Event ev = _executor.runTask(_yacsTask);
- _executor.endTask(_yacsTask, ev);
+ if(runInfo.isOk)
+ {
+ _executor.loadTask(_yacsTask, runInfo);
+ _executor.makeDatastreamConnections(_yacsTask);
+ YACS::Event ev = _executor.runTask(_yacsTask);
+ _executor.endTask(_yacsTask, ev);
+ }
+ else
+ {
+ _executor.failTask(_yacsTask, runInfo.error_message);
+ }
delete this; // provisoire
}
newResource.nbCores = res.second;
wm.addResource(newResource);
}
+ wm.freezeResources();
}
}
if(itResource->isSupported(ctype)
&& (*itTask)->isAccepted(itResource->resource()))
{
+ isSupported = true;
if(itResource->isAllocPossible(ctype))
{
float thisCost = itResource->cost(ctype);
result.worker.resource = best_resource->resource();
result.worker.index = best_resource->alloc(ctype);
}
- else if(!isSupported)
+ else if(!isSupported && _resourcesFrozen)
{
- // TODO: This task can never be run by any available resource.
+ // This task can never be run by any available resource.
+ result.taskFound = true;
+ result.worker.isOk = false;
+ result.worker.error_message = "No resource can run this task.";
}
}
if(result.taskFound)
void DefaultAlgorithm::liberate(const LaunchInfo& info)
{
const ContainerType& ctype = info.worker.type;
- if(!ctype.ignoreResources)
+ if(!ctype.ignoreResources && info.worker.isOk)
{
const Resource& r = info.worker.resource;
unsigned int index = info.worker.index;
LaunchInfo chooseTask()override;
void liberate(const LaunchInfo& info)override;
bool empty()const override;
+ void freezeResources() override { _resourcesFrozen = true;}
// ----------------------------- PRIVATE ----------------------------- //
private:
private:
std::list<ResourceLoadInfo> _resources;
std::list<Task*> _waitingTasks;
+ bool _resourcesFrozen = false;
};
}
#endif // ALGORITHMIMPLEMENT_H
ContainerType type;
Resource resource;
unsigned int index=0; // worker index on the resource for this type
+ bool isOk = true;
+ std::string error_message = "";
};
/**
CPPUNIT_TEST_SUITE(MyTest);
CPPUNIT_TEST(atest);
CPPUNIT_TEST(btest);
+ CPPUNIT_TEST(ctest);
CPPUNIT_TEST_SUITE_END();
public:
void atest();
void btest(); // ignore resources
+ void ctest(); // no available resource
};
/**
CPPUNIT_ASSERT( duration <= maxExpectedDuration);
}
+/**
+ * Test the case of a task which need more cores than any resource has.
+ */
+class ErrorTask : public WorkloadManager::Task
+{
+public:
+ ErrorTask(int nb_cores): WorkloadManager::Task(), _type(), _ok(), _message()
+ {
+ _type.ignoreResources = false;
+ _type.neededCores = nb_cores;
+ }
+
+ const WorkloadManager::ContainerType& type()const override {return _type;}
+
+ void run(const WorkloadManager::RunInfo& c)override
+ {
+ _ok = c.isOk;
+ _message = c.error_message;
+ }
+
+ bool checkState(bool ok, const std::string& message)
+ {
+ return (ok == _ok) && (message == _message);
+ }
+
+private:
+ WorkloadManager::ContainerType _type;
+ bool _ok;
+ std::string _message;
+};
+
+void MyTest::ctest()
+{
+ WorkloadManager::Resource r;
+ r.id = 1;
+ r.name = "r1";
+ r.nbCores = 1;
+ ErrorTask t1(1), t2(10);
+ WorkloadManager::DefaultAlgorithm algo;
+ WorkloadManager::WorkloadManager wlm(algo);
+ wlm.addResource(r);
+ wlm.addTask(&t1);
+ wlm.addTask(&t2);
+ wlm.start();
+ wlm.stop();
+ CPPUNIT_ASSERT(t1.checkState(true, ""));
+ CPPUNIT_ASSERT(t2.checkState(false, "No resource can run this task."));
+ // no error mode: wait for a resource to be added
+ WorkloadManager::DefaultAlgorithm algo_noerror;
+ WorkloadManager::WorkloadManager wlm2(algo_noerror);
+ wlm2.addResource(r);
+ wlm2.addTask(&t1);
+ wlm2.addTask(&t2);
+ wlm2.start();
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ r.id = 2;
+ r.name = "r2";
+ r.nbCores = 20;
+ wlm2.addResource(r);
+ wlm2.stop();
+ CPPUNIT_ASSERT(t1.checkState(true, ""));
+ CPPUNIT_ASSERT(t2.checkState(true, ""));
+}
+
CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
#include "BasicMainTest.hxx"
virtual LaunchInfo chooseTask()=0;
virtual void liberate(const LaunchInfo& info)=0;
virtual bool empty()const =0;
+ virtual void freezeResources()=0;
};
}
#endif // WORKLOADALGORITHM_H
, _data_mutex()
, _startCondition()
, _endCondition()
- , _stop(false)
+ , _stop(true)
, _otherThreads()
, _algo(algo)
{
_startCondition.notify_one();
}
+ void WorkloadManager::freezeResources()
+ {
+ std::unique_lock<std::mutex> lock(_data_mutex);
+ _algo.freezeResources();
+ _startCondition.notify_one();
+ }
+
void WorkloadManager::addTask(Task* t)
{
std::unique_lock<std::mutex> lock(_data_mutex);
{
{
std::unique_lock<std::mutex> lock(_data_mutex);
+ if(!_stop)
+ return; // already started
_stop = false;
}
_otherThreads.emplace_back(std::async(std::launch::async, [this]
{
std::unique_lock<std::mutex> lock(_data_mutex);
_stop = true;
+ _algo.freezeResources();
}
_startCondition.notify_one();
_endCondition.notify_one();
~WorkloadManager();
void addTask(Task* t);
void addResource(const Resource& r);
+ void freezeResources(); //! no more resources can be added
void start(); //! start execution
void stop(); //! stop execution
err_message = proc.getChildByName("End").getOutputPort("err_message").getPyObj()
self.fail(err_message)
+ def test4(self):
+ """ Verify the execution is stoped if no resource can run a task.
+ """
+ proc = self.l.load("samples/wlm_error.xml")
+ self.e.RunW(proc,0)
+ self.assertEqual(proc.getState(),pilot.FAILED)
+ self.assertEqual(proc.getChildByName("ErrorNode").getState(),pilot.ERROR)
+
if __name__ == '__main__':
dir_test = tempfile.mkdtemp(suffix=".yacstest")
file_test = os.path.join(dir_test,"UnitTestsResult")