]> SALOME platform Git repositories - modules/yacs.git/blob - src/workloadmanager/Test/TestMain.cxx
Salome HOME
Work in progress : workload manager uses nb_parallel_procs.
[modules/yacs.git] / src / workloadmanager / Test / TestMain.cxx
1 // Copyright (C) 2020  EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 //
18
19 #include <cppunit/TestFixture.h>
20 #include <stdexcept>
21
22 #include <iostream>
23 #include <fstream>
24 #include <cstdlib>
25 #include <cppunit/extensions/HelperMacros.h>
26
27 #include <string>
28 #include <sstream>
29
30 #include <chrono>
31 #include <ctime>
32 #include <thread>
33
34 #include "../WorkloadManager.hxx"
35 #include "../DefaultAlgorithm.hxx"
36
37 constexpr bool ACTIVATE_DEBUG_LOG = false;
38 template<typename... Ts>
39 void DEBUG_LOG(Ts... args)
40 {
41   if(! ACTIVATE_DEBUG_LOG)
42     return;
43   if(sizeof...(Ts) == 0)
44     return;
45   std::ostringstream message;
46   // TODO:  C++17 solution: ( (message << args), ...);
47   // since initializer lists guarantee sequencing, this can be used to
48   // call a function on each element of a pack, in order:
49   int dummy[] = { (message << args, 0)...};
50   message << std::endl;
51   std::cerr << message.str();
52 }
53
54 class MyTask;
55 class AbstractChecker
56 {
57 public:
58   virtual void check(const WorkloadManager::RunInfo& c, MyTask* t)=0;
59 };
60
61 template <std::size_t size_R, std::size_t size_T>
62 class Checker : public AbstractChecker
63 {
64 public:
65   Checker();
66   void check(const WorkloadManager::RunInfo& c, MyTask* t)override;
67   void globalCheck();
68   void reset();
69
70   WorkloadManager::Resource resources[size_R];
71   WorkloadManager::ContainerType types[size_T];
72 private:
73   std::mutex _mutex;
74   int _maxContainersForResource[size_R][size_T];
75 };
76
77 class MyTask : public WorkloadManager::Task
78 {
79 public:
80   const WorkloadManager::ContainerType& type()const override {return *_type;}
81   void run(const WorkloadManager::RunInfo& c)override
82   {
83     _check->check(c, this);
84
85     DEBUG_LOG("Running task ", _id, " on ", c.resource.name, "-", c.type.name,
86               "-", c.index);
87     std::this_thread::sleep_for(std::chrono::seconds(_sleep));
88     DEBUG_LOG("Finish task ", _id);
89   }
90
91   void reset(int id,
92              const WorkloadManager::ContainerType* type,
93              int sleep,
94              AbstractChecker * check
95             )
96   {
97     _id = id;
98     _type = type;
99     _sleep = sleep;
100     _check = check;
101   }
102 private:
103   int _id = 0;
104   const WorkloadManager::ContainerType* _type = nullptr;
105   int _sleep = 0;
106   AbstractChecker * _check;
107 };
108
109 template <std::size_t size_R, std::size_t size_T>
110 Checker<size_R, size_T>::Checker()
111 {
112   for(std::size_t i=0; i < size_R; i ++)
113   {
114     resources[i].id = i;
115     std::ostringstream name;
116     name << "r" << i;
117     resources[i].name = name.str();
118   }
119
120   for(std::size_t i=0; i < size_T; i ++)
121   {
122     types[i].id = i;
123     std::ostringstream name;
124     name << "t" << i;
125     types[i].name = name.str();
126   }
127
128   for(std::size_t i=0; i < size_R; i++)
129     for(std::size_t j=0; j < size_T; j++)
130       _maxContainersForResource[i][j] = 0;
131 }
132
133 template <std::size_t size_R, std::size_t size_T>
134 void Checker<size_R, size_T>::check(const WorkloadManager::RunInfo& c,
135                                     MyTask* t)
136 {
137   std::unique_lock<std::mutex> lock(_mutex);
138   int& max = _maxContainersForResource[c.resource.id][c.type.id];
139   if( max < c.index)
140     max = c.index;
141 }
142
143 template <std::size_t size_R, std::size_t size_T>
144 void Checker<size_R, size_T>::globalCheck()
145 {
146   for(std::size_t i=0; i < size_R; i++)
147   {
148     float global_max = 0;
149     for(std::size_t j=0; j < size_T; j++)
150     {
151       int max = _maxContainersForResource[i][j];
152       DEBUG_LOG(resources[i].name, ", ", types[j].name,
153                 " max simultaneous runs:", max+1);
154       CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
155       global_max += types[j].neededCores * float(max+1);
156     }
157     DEBUG_LOG(resources[i].name, " max cores added for evry type: ", global_max);
158     // This assertion may be false if there are more resources than needed.
159     CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
160   }
161 }
162
163 template <std::size_t size_R, std::size_t size_T>
164 void Checker<size_R, size_T>::reset()
165 {
166   for(std::size_t i=0; i < size_R; i++)
167     for(std::size_t j=0; j < size_T; j++)
168       _maxContainersForResource[i][j] = 0;
169 }
170
171 class MyTest: public CppUnit::TestFixture
172 {
173   CPPUNIT_TEST_SUITE(MyTest);
174   CPPUNIT_TEST(atest);
175   CPPUNIT_TEST(btest);
176   CPPUNIT_TEST_SUITE_END();
177 public:
178   void atest();
179   void btest(); // ignore resources
180 };
181
182 /**
183  * General test with 150 tasks of 3 types:
184  *   - 50 tasks which need 4 cores for 2s each
185  *   - 50 tasks which need 1 core for 1s each
186  *   - 50 tasks which need no core but take 2s each
187  * We use 2 resources: 10 cores and 18 cores
188  * We verify the global time of execution.
189  */
190 void MyTest::atest()
191 {
192   constexpr std::size_t resourcesNumber = 2;
193   constexpr std::size_t typesNumber = 3;
194   Checker<resourcesNumber, typesNumber> check;
195   check.resources[0].nbCores = 10;
196   check.resources[1].nbCores = 18;
197   check.types[0].neededCores = 4.0;
198   check.types[1].neededCores = 1.0;
199   check.types[2].neededCores = 0.0; // tasks to be run with no cost
200
201   for(std::size_t i=0; i < resourcesNumber; i ++)
202     DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
203               " cores.");
204   for(std::size_t i=0; i < typesNumber; i ++)
205     DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
206               " cores.");
207
208   constexpr std::size_t tasksNumber = 150;
209   MyTask tasks[tasksNumber];
210   for(int type_id = 0; type_id < typesNumber; type_id++)
211     for(int j = type_id * tasksNumber / typesNumber;
212         j < (type_id + 1) * tasksNumber / typesNumber;
213         j++)
214         //            id,  ContainerType,       sleep (1|2s)
215         tasks[j].reset(j, &check.types[type_id], 2-type_id%2, &check);
216
217   DEBUG_LOG("Number of tasks: ", tasksNumber);
218   for(int type_id = 0; type_id < typesNumber; type_id++)
219     DEBUG_LOG("Tasks from ", type_id * tasksNumber / typesNumber, 
220               " to ", (type_id + 1) * tasksNumber / typesNumber,
221               " are of type ", check.types[type_id].name);
222
223   WorkloadManager::DefaultAlgorithm algo;
224   WorkloadManager::WorkloadManager wlm(algo);
225   for(std::size_t i=0; i < resourcesNumber; i ++)
226     wlm.addResource(check.resources[i]);
227
228   // Add 4 core tasks first
229   check.reset();
230   for(std::size_t i = 0; i < tasksNumber; i++)
231     wlm.addTask(&tasks[i]);
232   std::chrono::steady_clock::time_point start_time;
233   start_time = std::chrono::steady_clock::now();
234   wlm.start(); // tasks can be added before start.
235   wlm.stop();
236   std::chrono::steady_clock::time_point end_time;
237   end_time = std::chrono::steady_clock::now();
238   std::chrono::seconds duration;
239   duration = std::chrono::duration_cast<std::chrono::seconds>
240              (end_time - start_time);
241   std::chrono::seconds maxExpectedDuration(22);
242   CPPUNIT_ASSERT( duration < maxExpectedDuration );
243   DEBUG_LOG("Test step duration : ", duration.count(), "s");
244   check.globalCheck();
245
246   // Add 1 core tasks first
247   check.reset();
248   // WARNING: std::size_t is always >= 0
249   for(int i = tasksNumber-1; i >= 0; i--)
250     wlm.addTask(&tasks[i]);
251   start_time = std::chrono::steady_clock::now();
252   wlm.start(); // tasks can be added before start.
253   wlm.stop();
254   end_time = std::chrono::steady_clock::now();
255   duration = std::chrono::duration_cast<std::chrono::seconds>
256              (end_time - start_time);
257   CPPUNIT_ASSERT( duration < maxExpectedDuration );
258   DEBUG_LOG("Test step duration : ", duration.count(), "s");
259   check.globalCheck();
260
261   // Add 1 core tasks first & start before addTask
262   check.reset();
263   start_time = std::chrono::steady_clock::now();
264   wlm.start();
265   for(int i = tasksNumber-1; i >= 0; i--)
266     wlm.addTask(&tasks[i]);
267   wlm.stop();
268   end_time = std::chrono::steady_clock::now();
269   duration = std::chrono::duration_cast<std::chrono::seconds>
270              (end_time - start_time);
271   CPPUNIT_ASSERT( duration < maxExpectedDuration );
272   DEBUG_LOG("Test step duration : ", duration.count(), "s");
273   check.globalCheck();
274
275 }
276
277 /**
278  * Test the case of tasks which need no resources and can be run whithout
279  * waiting.
280  */
281 void MyTest::btest()
282 {
283   Checker<1, 1> check;
284   WorkloadManager::ContainerType ctype;
285   ctype.ignoreResources = true;
286   constexpr std::size_t tasksNumber = 20;
287   MyTask tasks[tasksNumber];
288   for(std::size_t i = 0; i < tasksNumber; i++)
289     tasks[i].reset(i, &ctype, 1, &check);
290   WorkloadManager::DefaultAlgorithm algo;
291   WorkloadManager::WorkloadManager wlm(algo);
292   // no resource needed
293   std::chrono::steady_clock::time_point start_time;
294   std::chrono::steady_clock::time_point end_time;
295   std::chrono::seconds duration;
296   start_time = std::chrono::steady_clock::now();
297   wlm.start();
298   for(std::size_t i = 0; i < tasksNumber; i++)
299     wlm.addTask(&tasks[i]);
300   wlm.stop();
301   end_time = std::chrono::steady_clock::now();
302   duration = std::chrono::duration_cast<std::chrono::seconds>
303              (end_time - start_time);
304   std::chrono::seconds maxExpectedDuration(2);
305   CPPUNIT_ASSERT( duration <= maxExpectedDuration);
306 }
307
308 CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
309
310 #include "BasicMainTest.hxx"