Salome HOME
eaf661e70ec646bd93930b100ff5f92fbc342146
[modules/yacs.git] / src / workloadmanager / Test / TestMain.cxx
1 // Copyright (C) 2020-2024  CEA, EDF
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include <cppunit/TestFixture.h>
21 #include <stdexcept>
22
23 #include <iostream>
24 #include <fstream>
25 #include <cstdlib>
26 #include <cppunit/extensions/HelperMacros.h>
27
28 #include <string>
29 #include <sstream>
30
31 #include <chrono>
32 #include <ctime>
33 #include <thread>
34
35 #include "../WorkloadManager.hxx"
36 #include "../DefaultAlgorithm.hxx"
37
38 constexpr bool ACTIVATE_DEBUG_LOG = false;
39 template<typename... Ts>
40 void DEBUG_LOG(Ts... args)
41 {
42   if(! ACTIVATE_DEBUG_LOG)
43     return;
44   if(sizeof...(Ts) == 0)
45     return;
46   std::ostringstream message;
47   // TODO:  C++17 solution: ( (message << args), ...);
48   // since initializer lists guarantee sequencing, this can be used to
49   // call a function on each element of a pack, in order:
50   int dummy[] = { (message << args, 0)...};
51   message << std::endl;
52   std::cerr << message.str();
53 }
54
55 class MyTask;
56 class AbstractChecker
57 {
58 public:
59   virtual void check(const WorkloadManager::RunInfo& c, MyTask* t)=0;
60 };
61
62 template <std::size_t size_R, std::size_t size_T>
63 class Checker : public AbstractChecker
64 {
65 public:
66   Checker();
67   void check(const WorkloadManager::RunInfo& c, MyTask* t)override;
68   void globalCheck();
69   void reset();
70
71   WorkloadManager::Resource resources[size_R];
72   WorkloadManager::ContainerType types[size_T];
73 private:
74   std::mutex _mutex;
75   int _maxContainersForResource[size_R][size_T];
76 };
77
78 class MyTask : public WorkloadManager::Task
79 {
80 public:
81   const WorkloadManager::ContainerType& type()const override {return *_type;}
82   void run(const WorkloadManager::RunInfo& c)override
83   {
84     _check->check(c, this);
85
86     DEBUG_LOG("Running task ", _id, " on ", c.resource.name, "-", c.type.name,
87               "-", c.index);
88     std::this_thread::sleep_for(std::chrono::seconds(_sleep));
89     DEBUG_LOG("Finish task ", _id);
90   }
91
92   void reset(int id,
93              const WorkloadManager::ContainerType* type,
94              int sleep,
95              AbstractChecker * check
96             )
97   {
98     _id = id;
99     _type = type;
100     _sleep = sleep;
101     _check = check;
102   }
103 private:
104   int _id = 0;
105   const WorkloadManager::ContainerType* _type = nullptr;
106   int _sleep = 0;
107   AbstractChecker * _check;
108 };
109
110 template <std::size_t size_R, std::size_t size_T>
111 Checker<size_R, size_T>::Checker()
112 {
113   for(std::size_t i=0; i < size_R; i ++)
114   {
115     resources[i].id = i;
116     std::ostringstream name;
117     name << "r" << i;
118     resources[i].name = name.str();
119   }
120
121   for(std::size_t i=0; i < size_T; i ++)
122   {
123     types[i].id = i;
124     std::ostringstream name;
125     name << "t" << i;
126     types[i].name = name.str();
127   }
128
129   for(std::size_t i=0; i < size_R; i++)
130     for(std::size_t j=0; j < size_T; j++)
131       _maxContainersForResource[i][j] = 0;
132 }
133
134 template <std::size_t size_R, std::size_t size_T>
135 void Checker<size_R, size_T>::check(const WorkloadManager::RunInfo& c,
136                                     MyTask* t)
137 {
138   std::unique_lock<std::mutex> lock(_mutex);
139   int& max = _maxContainersForResource[c.resource.id][c.type.id];
140   if( max < c.index)
141     max = c.index;
142 }
143
144 template <std::size_t size_R, std::size_t size_T>
145 void Checker<size_R, size_T>::globalCheck()
146 {
147   for(std::size_t i=0; i < size_R; i++)
148   {
149     float global_max = 0;
150     for(std::size_t j=0; j < size_T; j++)
151     {
152       int max = _maxContainersForResource[i][j];
153       DEBUG_LOG(resources[i].name, ", ", types[j].name,
154                 " max simultaneous runs:", max+1);
155       CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
156       global_max += types[j].neededCores * float(max+1);
157     }
158     DEBUG_LOG(resources[i].name, " max cores added for evry type: ", global_max);
159     // This assertion may be false if there are more resources than needed.
160     CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
161   }
162 }
163
164 template <std::size_t size_R, std::size_t size_T>
165 void Checker<size_R, size_T>::reset()
166 {
167   for(std::size_t i=0; i < size_R; i++)
168     for(std::size_t j=0; j < size_T; j++)
169       _maxContainersForResource[i][j] = 0;
170 }
171
172 class MyTest: public CppUnit::TestFixture
173 {
174   CPPUNIT_TEST_SUITE(MyTest);
175   CPPUNIT_TEST(atest);
176   CPPUNIT_TEST(btest);
177   CPPUNIT_TEST(ctest);
178   CPPUNIT_TEST_SUITE_END();
179 public:
180   void atest();
181   void btest(); // ignore resources
182   void ctest(); // no available resource
183 };
184
185 /**
186  * General test with 150 tasks of 3 types:
187  *   - 50 tasks which need 4 cores for 2s each
188  *   - 50 tasks which need 1 core for 1s each
189  *   - 50 tasks which need no core but take 2s each
190  * We use 2 resources: 10 cores and 18 cores
191  * We verify the global time of execution.
192  */
193 void MyTest::atest()
194 {
195   constexpr std::size_t resourcesNumber = 2;
196   constexpr std::size_t typesNumber = 3;
197   Checker<resourcesNumber, typesNumber> check;
198   check.resources[0].nbCores = 10;
199   check.resources[1].nbCores = 18;
200   check.types[0].neededCores = 4.0;
201   check.types[1].neededCores = 1.0;
202   check.types[2].neededCores = 0.0; // tasks to be run with no cost
203
204   for(std::size_t i=0; i < resourcesNumber; i ++)
205     DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
206               " cores.");
207   for(std::size_t i=0; i < typesNumber; i ++)
208     DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
209               " cores.");
210
211   constexpr std::size_t tasksNumber = 150;
212   MyTask tasks[tasksNumber];
213   for(int type_id = 0; type_id < typesNumber; type_id++)
214     for(int j = type_id * tasksNumber / typesNumber;
215         j < (type_id + 1) * tasksNumber / typesNumber;
216         j++)
217         //            id,  ContainerType,       sleep (1|2s)
218         tasks[j].reset(j, &check.types[type_id], 2-type_id%2, &check);
219
220   DEBUG_LOG("Number of tasks: ", tasksNumber);
221   for(int type_id = 0; type_id < typesNumber; type_id++)
222     DEBUG_LOG("Tasks from ", type_id * tasksNumber / typesNumber,
223               " to ", (type_id + 1) * tasksNumber / typesNumber,
224               " are of type ", check.types[type_id].name);
225
226   WorkloadManager::DefaultAlgorithm algo;
227   WorkloadManager::WorkloadManager wlm(algo);
228   for(std::size_t i=0; i < resourcesNumber; i ++)
229     wlm.addResource(check.resources[i]);
230
231   // Add 4 core tasks first
232   check.reset();
233   for(std::size_t i = 0; i < tasksNumber; i++)
234     wlm.addTask(&tasks[i]);
235   std::chrono::steady_clock::time_point start_time;
236   start_time = std::chrono::steady_clock::now();
237   wlm.start(); // tasks can be added before start.
238   wlm.stop();
239   std::chrono::steady_clock::time_point end_time;
240   end_time = std::chrono::steady_clock::now();
241   std::chrono::seconds duration;
242   duration = std::chrono::duration_cast<std::chrono::seconds>
243              (end_time - start_time);
244   std::chrono::seconds maxExpectedDuration(22);
245   CPPUNIT_ASSERT( duration < maxExpectedDuration );
246   DEBUG_LOG("Test step duration : ", duration.count(), "s");
247   check.globalCheck();
248
249   // Add 1 core tasks first
250   check.reset();
251   // WARNING: std::size_t is always >= 0
252   for(int i = tasksNumber-1; i >= 0; i--)
253     wlm.addTask(&tasks[i]);
254   start_time = std::chrono::steady_clock::now();
255   wlm.start(); // tasks can be added before start.
256   wlm.stop();
257   end_time = std::chrono::steady_clock::now();
258   duration = std::chrono::duration_cast<std::chrono::seconds>
259              (end_time - start_time);
260   CPPUNIT_ASSERT( duration < maxExpectedDuration );
261   DEBUG_LOG("Test step duration : ", duration.count(), "s");
262   check.globalCheck();
263
264   // Add 1 core tasks first & start before addTask
265   check.reset();
266   start_time = std::chrono::steady_clock::now();
267   wlm.start();
268   for(int i = tasksNumber-1; i >= 0; i--)
269     wlm.addTask(&tasks[i]);
270   wlm.stop();
271   end_time = std::chrono::steady_clock::now();
272   duration = std::chrono::duration_cast<std::chrono::seconds>
273              (end_time - start_time);
274   CPPUNIT_ASSERT( duration < maxExpectedDuration );
275   DEBUG_LOG("Test step duration : ", duration.count(), "s");
276   check.globalCheck();
277
278 }
279
280 /**
281  * Test the case of tasks which need no resources and can be run whithout
282  * waiting.
283  */
284 void MyTest::btest()
285 {
286   Checker<1, 1> check;
287   WorkloadManager::ContainerType ctype;
288   ctype.ignoreResources = true;
289   constexpr std::size_t tasksNumber = 20;
290   MyTask tasks[tasksNumber];
291   for(std::size_t i = 0; i < tasksNumber; i++)
292     tasks[i].reset(i, &ctype, 1, &check);
293   WorkloadManager::DefaultAlgorithm algo;
294   WorkloadManager::WorkloadManager wlm(algo);
295   // no resource needed
296   std::chrono::steady_clock::time_point start_time;
297   std::chrono::steady_clock::time_point end_time;
298   std::chrono::seconds duration;
299   start_time = std::chrono::steady_clock::now();
300   wlm.start();
301   for(std::size_t i = 0; i < tasksNumber; i++)
302     wlm.addTask(&tasks[i]);
303   wlm.stop();
304   end_time = std::chrono::steady_clock::now();
305   duration = std::chrono::duration_cast<std::chrono::seconds>
306              (end_time - start_time);
307   std::chrono::seconds maxExpectedDuration(2);
308   CPPUNIT_ASSERT( duration <= maxExpectedDuration);
309 }
310
311 /**
312  * Test the case of a task which need more cores than any resource has.
313  */
314 class ErrorTask : public WorkloadManager::Task
315 {
316 public:
317   ErrorTask(int nb_cores): WorkloadManager::Task(), _type(), _ok(), _message()
318   {
319     _type.ignoreResources = false;
320     _type.neededCores = nb_cores;
321   }
322
323   const WorkloadManager::ContainerType& type()const override {return _type;}
324
325   void run(const WorkloadManager::RunInfo& c)override
326   {
327     _ok = c.isOk;
328     _message = c.error_message;
329   }
330
331   bool checkState(bool ok, const std::string& message)
332   {
333     return (ok == _ok) && (message == _message);
334   }
335
336 private:
337   WorkloadManager::ContainerType _type;
338   bool _ok;
339   std::string _message;
340 };
341
342 void MyTest::ctest()
343 {
344   WorkloadManager::Resource r;
345   r.id = 1;
346   r.name = "r1";
347   r.nbCores = 1;
348   ErrorTask t1(1), t2(10);
349   WorkloadManager::DefaultAlgorithm algo;
350   WorkloadManager::WorkloadManager wlm(algo);
351   wlm.addResource(r);
352   wlm.addTask(&t1);
353   wlm.addTask(&t2);
354   wlm.start();
355   wlm.stop();
356   CPPUNIT_ASSERT(t1.checkState(true, ""));
357   CPPUNIT_ASSERT(t2.checkState(false, "No resource can run this task."));
358   // no error mode: wait for a resource to be added
359   WorkloadManager::DefaultAlgorithm algo_noerror;
360   WorkloadManager::WorkloadManager wlm2(algo_noerror);
361   wlm2.addResource(r);
362   wlm2.addTask(&t1);
363   wlm2.addTask(&t2);
364   wlm2.start();
365   std::this_thread::sleep_for(std::chrono::seconds(1));
366   r.id = 2;
367   r.name = "r2";
368   r.nbCores = 20;
369   wlm2.addResource(r);
370   wlm2.stop();
371   CPPUNIT_ASSERT(t1.checkState(true, ""));
372   CPPUNIT_ASSERT(t2.checkState(true, ""));
373 }
374
375 CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
376
377 #include "BasicMainTest.hxx"