Salome HOME
Generate an error when no resource can run a task with the workload manager.
[modules/yacs.git] / src / workloadmanager / Test / TestMain.cxx
1 // Copyright (C) 2020  EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 //
18
19 #include <cppunit/TestFixture.h>
20 #include <stdexcept>
21
22 #include <iostream>
23 #include <fstream>
24 #include <cstdlib>
25 #include <cppunit/extensions/HelperMacros.h>
26
27 #include <string>
28 #include <sstream>
29
30 #include <chrono>
31 #include <ctime>
32 #include <thread>
33
34 #include "../WorkloadManager.hxx"
35 #include "../DefaultAlgorithm.hxx"
36
37 constexpr bool ACTIVATE_DEBUG_LOG = false;
38 template<typename... Ts>
39 void DEBUG_LOG(Ts... args)
40 {
41   if(! ACTIVATE_DEBUG_LOG)
42     return;
43   if(sizeof...(Ts) == 0)
44     return;
45   std::ostringstream message;
46   // TODO:  C++17 solution: ( (message << args), ...);
47   // since initializer lists guarantee sequencing, this can be used to
48   // call a function on each element of a pack, in order:
49   int dummy[] = { (message << args, 0)...};
50   message << std::endl;
51   std::cerr << message.str();
52 }
53
54 class MyTask;
55 class AbstractChecker
56 {
57 public:
58   virtual void check(const WorkloadManager::RunInfo& c, MyTask* t)=0;
59 };
60
61 template <std::size_t size_R, std::size_t size_T>
62 class Checker : public AbstractChecker
63 {
64 public:
65   Checker();
66   void check(const WorkloadManager::RunInfo& c, MyTask* t)override;
67   void globalCheck();
68   void reset();
69
70   WorkloadManager::Resource resources[size_R];
71   WorkloadManager::ContainerType types[size_T];
72 private:
73   std::mutex _mutex;
74   int _maxContainersForResource[size_R][size_T];
75 };
76
77 class MyTask : public WorkloadManager::Task
78 {
79 public:
80   const WorkloadManager::ContainerType& type()const override {return *_type;}
81   void run(const WorkloadManager::RunInfo& c)override
82   {
83     _check->check(c, this);
84
85     DEBUG_LOG("Running task ", _id, " on ", c.resource.name, "-", c.type.name,
86               "-", c.index);
87     std::this_thread::sleep_for(std::chrono::seconds(_sleep));
88     DEBUG_LOG("Finish task ", _id);
89   }
90
91   void reset(int id,
92              const WorkloadManager::ContainerType* type,
93              int sleep,
94              AbstractChecker * check
95             )
96   {
97     _id = id;
98     _type = type;
99     _sleep = sleep;
100     _check = check;
101   }
102 private:
103   int _id = 0;
104   const WorkloadManager::ContainerType* _type = nullptr;
105   int _sleep = 0;
106   AbstractChecker * _check;
107 };
108
109 template <std::size_t size_R, std::size_t size_T>
110 Checker<size_R, size_T>::Checker()
111 {
112   for(std::size_t i=0; i < size_R; i ++)
113   {
114     resources[i].id = i;
115     std::ostringstream name;
116     name << "r" << i;
117     resources[i].name = name.str();
118   }
119
120   for(std::size_t i=0; i < size_T; i ++)
121   {
122     types[i].id = i;
123     std::ostringstream name;
124     name << "t" << i;
125     types[i].name = name.str();
126   }
127
128   for(std::size_t i=0; i < size_R; i++)
129     for(std::size_t j=0; j < size_T; j++)
130       _maxContainersForResource[i][j] = 0;
131 }
132
133 template <std::size_t size_R, std::size_t size_T>
134 void Checker<size_R, size_T>::check(const WorkloadManager::RunInfo& c,
135                                     MyTask* t)
136 {
137   std::unique_lock<std::mutex> lock(_mutex);
138   int& max = _maxContainersForResource[c.resource.id][c.type.id];
139   if( max < c.index)
140     max = c.index;
141 }
142
143 template <std::size_t size_R, std::size_t size_T>
144 void Checker<size_R, size_T>::globalCheck()
145 {
146   for(std::size_t i=0; i < size_R; i++)
147   {
148     float global_max = 0;
149     for(std::size_t j=0; j < size_T; j++)
150     {
151       int max = _maxContainersForResource[i][j];
152       DEBUG_LOG(resources[i].name, ", ", types[j].name,
153                 " max simultaneous runs:", max+1);
154       CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
155       global_max += types[j].neededCores * float(max+1);
156     }
157     DEBUG_LOG(resources[i].name, " max cores added for evry type: ", global_max);
158     // This assertion may be false if there are more resources than needed.
159     CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
160   }
161 }
162
163 template <std::size_t size_R, std::size_t size_T>
164 void Checker<size_R, size_T>::reset()
165 {
166   for(std::size_t i=0; i < size_R; i++)
167     for(std::size_t j=0; j < size_T; j++)
168       _maxContainersForResource[i][j] = 0;
169 }
170
171 class MyTest: public CppUnit::TestFixture
172 {
173   CPPUNIT_TEST_SUITE(MyTest);
174   CPPUNIT_TEST(atest);
175   CPPUNIT_TEST(btest);
176   CPPUNIT_TEST(ctest);
177   CPPUNIT_TEST_SUITE_END();
178 public:
179   void atest();
180   void btest(); // ignore resources
181   void ctest(); // no available resource
182 };
183
184 /**
185  * General test with 150 tasks of 3 types:
186  *   - 50 tasks which need 4 cores for 2s each
187  *   - 50 tasks which need 1 core for 1s each
188  *   - 50 tasks which need no core but take 2s each
189  * We use 2 resources: 10 cores and 18 cores
190  * We verify the global time of execution.
191  */
192 void MyTest::atest()
193 {
194   constexpr std::size_t resourcesNumber = 2;
195   constexpr std::size_t typesNumber = 3;
196   Checker<resourcesNumber, typesNumber> check;
197   check.resources[0].nbCores = 10;
198   check.resources[1].nbCores = 18;
199   check.types[0].neededCores = 4.0;
200   check.types[1].neededCores = 1.0;
201   check.types[2].neededCores = 0.0; // tasks to be run with no cost
202
203   for(std::size_t i=0; i < resourcesNumber; i ++)
204     DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
205               " cores.");
206   for(std::size_t i=0; i < typesNumber; i ++)
207     DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
208               " cores.");
209
210   constexpr std::size_t tasksNumber = 150;
211   MyTask tasks[tasksNumber];
212   for(int type_id = 0; type_id < typesNumber; type_id++)
213     for(int j = type_id * tasksNumber / typesNumber;
214         j < (type_id + 1) * tasksNumber / typesNumber;
215         j++)
216         //            id,  ContainerType,       sleep (1|2s)
217         tasks[j].reset(j, &check.types[type_id], 2-type_id%2, &check);
218
219   DEBUG_LOG("Number of tasks: ", tasksNumber);
220   for(int type_id = 0; type_id < typesNumber; type_id++)
221     DEBUG_LOG("Tasks from ", type_id * tasksNumber / typesNumber,
222               " to ", (type_id + 1) * tasksNumber / typesNumber,
223               " are of type ", check.types[type_id].name);
224
225   WorkloadManager::DefaultAlgorithm algo;
226   WorkloadManager::WorkloadManager wlm(algo);
227   for(std::size_t i=0; i < resourcesNumber; i ++)
228     wlm.addResource(check.resources[i]);
229
230   // Add 4 core tasks first
231   check.reset();
232   for(std::size_t i = 0; i < tasksNumber; i++)
233     wlm.addTask(&tasks[i]);
234   std::chrono::steady_clock::time_point start_time;
235   start_time = std::chrono::steady_clock::now();
236   wlm.start(); // tasks can be added before start.
237   wlm.stop();
238   std::chrono::steady_clock::time_point end_time;
239   end_time = std::chrono::steady_clock::now();
240   std::chrono::seconds duration;
241   duration = std::chrono::duration_cast<std::chrono::seconds>
242              (end_time - start_time);
243   std::chrono::seconds maxExpectedDuration(22);
244   CPPUNIT_ASSERT( duration < maxExpectedDuration );
245   DEBUG_LOG("Test step duration : ", duration.count(), "s");
246   check.globalCheck();
247
248   // Add 1 core tasks first
249   check.reset();
250   // WARNING: std::size_t is always >= 0
251   for(int i = tasksNumber-1; i >= 0; i--)
252     wlm.addTask(&tasks[i]);
253   start_time = std::chrono::steady_clock::now();
254   wlm.start(); // tasks can be added before start.
255   wlm.stop();
256   end_time = std::chrono::steady_clock::now();
257   duration = std::chrono::duration_cast<std::chrono::seconds>
258              (end_time - start_time);
259   CPPUNIT_ASSERT( duration < maxExpectedDuration );
260   DEBUG_LOG("Test step duration : ", duration.count(), "s");
261   check.globalCheck();
262
263   // Add 1 core tasks first & start before addTask
264   check.reset();
265   start_time = std::chrono::steady_clock::now();
266   wlm.start();
267   for(int i = tasksNumber-1; i >= 0; i--)
268     wlm.addTask(&tasks[i]);
269   wlm.stop();
270   end_time = std::chrono::steady_clock::now();
271   duration = std::chrono::duration_cast<std::chrono::seconds>
272              (end_time - start_time);
273   CPPUNIT_ASSERT( duration < maxExpectedDuration );
274   DEBUG_LOG("Test step duration : ", duration.count(), "s");
275   check.globalCheck();
276
277 }
278
279 /**
280  * Test the case of tasks which need no resources and can be run whithout
281  * waiting.
282  */
283 void MyTest::btest()
284 {
285   Checker<1, 1> check;
286   WorkloadManager::ContainerType ctype;
287   ctype.ignoreResources = true;
288   constexpr std::size_t tasksNumber = 20;
289   MyTask tasks[tasksNumber];
290   for(std::size_t i = 0; i < tasksNumber; i++)
291     tasks[i].reset(i, &ctype, 1, &check);
292   WorkloadManager::DefaultAlgorithm algo;
293   WorkloadManager::WorkloadManager wlm(algo);
294   // no resource needed
295   std::chrono::steady_clock::time_point start_time;
296   std::chrono::steady_clock::time_point end_time;
297   std::chrono::seconds duration;
298   start_time = std::chrono::steady_clock::now();
299   wlm.start();
300   for(std::size_t i = 0; i < tasksNumber; i++)
301     wlm.addTask(&tasks[i]);
302   wlm.stop();
303   end_time = std::chrono::steady_clock::now();
304   duration = std::chrono::duration_cast<std::chrono::seconds>
305              (end_time - start_time);
306   std::chrono::seconds maxExpectedDuration(2);
307   CPPUNIT_ASSERT( duration <= maxExpectedDuration);
308 }
309
310 /**
311  * Test the case of a task which need more cores than any resource has.
312  */
313 class ErrorTask : public WorkloadManager::Task
314 {
315 public:
316   ErrorTask(int nb_cores): WorkloadManager::Task(), _type(), _ok(), _message()
317   {
318     _type.ignoreResources = false;
319     _type.neededCores = nb_cores;
320   }
321
322   const WorkloadManager::ContainerType& type()const override {return _type;}
323
324   void run(const WorkloadManager::RunInfo& c)override
325   {
326     _ok = c.isOk;
327     _message = c.error_message;
328   }
329
330   bool checkState(bool ok, const std::string& message)
331   {
332     return (ok == _ok) && (message == _message);
333   }
334
335 private:
336   WorkloadManager::ContainerType _type;
337   bool _ok;
338   std::string _message;
339 };
340
341 void MyTest::ctest()
342 {
343   WorkloadManager::Resource r;
344   r.id = 1;
345   r.name = "r1";
346   r.nbCores = 1;
347   ErrorTask t1(1), t2(10);
348   WorkloadManager::DefaultAlgorithm algo;
349   WorkloadManager::WorkloadManager wlm(algo);
350   wlm.addResource(r);
351   wlm.addTask(&t1);
352   wlm.addTask(&t2);
353   wlm.start();
354   wlm.stop();
355   CPPUNIT_ASSERT(t1.checkState(true, ""));
356   CPPUNIT_ASSERT(t2.checkState(false, "No resource can run this task."));
357   // no error mode: wait for a resource to be added
358   WorkloadManager::DefaultAlgorithm algo_noerror;
359   WorkloadManager::WorkloadManager wlm2(algo_noerror);
360   wlm2.addResource(r);
361   wlm2.addTask(&t1);
362   wlm2.addTask(&t2);
363   wlm2.start();
364   std::this_thread::sleep_for(std::chrono::seconds(1));
365   r.id = 2;
366   r.name = "r2";
367   r.nbCores = 20;
368   wlm2.addResource(r);
369   wlm2.stop();
370   CPPUNIT_ASSERT(t1.checkState(true, ""));
371   CPPUNIT_ASSERT(t2.checkState(true, ""));
372 }
373
374 CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
375
376 #include "BasicMainTest.hxx"