1 // Copyright (C) 2020-2023 CEA, EDF
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include <cppunit/TestFixture.h>
26 #include <cppunit/extensions/HelperMacros.h>
35 #include "../WorkloadManager.hxx"
36 #include "../DefaultAlgorithm.hxx"
38 constexpr bool ACTIVATE_DEBUG_LOG = false;
39 template<typename... Ts>
40 void DEBUG_LOG(Ts... args)
42 if(! ACTIVATE_DEBUG_LOG)
44 if(sizeof...(Ts) == 0)
46 std::ostringstream message;
47 // TODO: C++17 solution: ( (message << args), ...);
48 // since initializer lists guarantee sequencing, this can be used to
49 // call a function on each element of a pack, in order:
50 int dummy[] = { (message << args, 0)...};
52 std::cerr << message.str();
59 virtual void check(const WorkloadManager::RunInfo& c, MyTask* t)=0;
62 template <std::size_t size_R, std::size_t size_T>
63 class Checker : public AbstractChecker
67 void check(const WorkloadManager::RunInfo& c, MyTask* t)override;
71 WorkloadManager::Resource resources[size_R];
72 WorkloadManager::ContainerType types[size_T];
75 int _maxContainersForResource[size_R][size_T];
78 class MyTask : public WorkloadManager::Task
81 const WorkloadManager::ContainerType& type()const override {return *_type;}
82 void run(const WorkloadManager::RunInfo& c)override
84 _check->check(c, this);
86 DEBUG_LOG("Running task ", _id, " on ", c.resource.name, "-", c.type.name,
88 std::this_thread::sleep_for(std::chrono::seconds(_sleep));
89 DEBUG_LOG("Finish task ", _id);
93 const WorkloadManager::ContainerType* type,
95 AbstractChecker * check
105 const WorkloadManager::ContainerType* _type = nullptr;
107 AbstractChecker * _check;
110 template <std::size_t size_R, std::size_t size_T>
111 Checker<size_R, size_T>::Checker()
113 for(std::size_t i=0; i < size_R; i ++)
116 std::ostringstream name;
118 resources[i].name = name.str();
121 for(std::size_t i=0; i < size_T; i ++)
124 std::ostringstream name;
126 types[i].name = name.str();
129 for(std::size_t i=0; i < size_R; i++)
130 for(std::size_t j=0; j < size_T; j++)
131 _maxContainersForResource[i][j] = 0;
134 template <std::size_t size_R, std::size_t size_T>
135 void Checker<size_R, size_T>::check(const WorkloadManager::RunInfo& c,
138 std::unique_lock<std::mutex> lock(_mutex);
139 int& max = _maxContainersForResource[c.resource.id][c.type.id];
144 template <std::size_t size_R, std::size_t size_T>
145 void Checker<size_R, size_T>::globalCheck()
147 for(std::size_t i=0; i < size_R; i++)
149 float global_max = 0;
150 for(std::size_t j=0; j < size_T; j++)
152 int max = _maxContainersForResource[i][j];
153 DEBUG_LOG(resources[i].name, ", ", types[j].name,
154 " max simultaneous runs:", max+1);
155 CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
156 global_max += types[j].neededCores * float(max+1);
158 DEBUG_LOG(resources[i].name, " max cores added for evry type: ", global_max);
159 // This assertion may be false if there are more resources than needed.
160 CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
164 template <std::size_t size_R, std::size_t size_T>
165 void Checker<size_R, size_T>::reset()
167 for(std::size_t i=0; i < size_R; i++)
168 for(std::size_t j=0; j < size_T; j++)
169 _maxContainersForResource[i][j] = 0;
172 class MyTest: public CppUnit::TestFixture
174 CPPUNIT_TEST_SUITE(MyTest);
178 CPPUNIT_TEST_SUITE_END();
181 void btest(); // ignore resources
182 void ctest(); // no available resource
186 * General test with 150 tasks of 3 types:
187 * - 50 tasks which need 4 cores for 2s each
188 * - 50 tasks which need 1 core for 1s each
189 * - 50 tasks which need no core but take 2s each
190 * We use 2 resources: 10 cores and 18 cores
191 * We verify the global time of execution.
195 constexpr std::size_t resourcesNumber = 2;
196 constexpr std::size_t typesNumber = 3;
197 Checker<resourcesNumber, typesNumber> check;
198 check.resources[0].nbCores = 10;
199 check.resources[1].nbCores = 18;
200 check.types[0].neededCores = 4.0;
201 check.types[1].neededCores = 1.0;
202 check.types[2].neededCores = 0.0; // tasks to be run with no cost
204 for(std::size_t i=0; i < resourcesNumber; i ++)
205 DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
207 for(std::size_t i=0; i < typesNumber; i ++)
208 DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
211 constexpr std::size_t tasksNumber = 150;
212 MyTask tasks[tasksNumber];
213 for(int type_id = 0; type_id < typesNumber; type_id++)
214 for(int j = type_id * tasksNumber / typesNumber;
215 j < (type_id + 1) * tasksNumber / typesNumber;
217 // id, ContainerType, sleep (1|2s)
218 tasks[j].reset(j, &check.types[type_id], 2-type_id%2, &check);
220 DEBUG_LOG("Number of tasks: ", tasksNumber);
221 for(int type_id = 0; type_id < typesNumber; type_id++)
222 DEBUG_LOG("Tasks from ", type_id * tasksNumber / typesNumber,
223 " to ", (type_id + 1) * tasksNumber / typesNumber,
224 " are of type ", check.types[type_id].name);
226 WorkloadManager::DefaultAlgorithm algo;
227 WorkloadManager::WorkloadManager wlm(algo);
228 for(std::size_t i=0; i < resourcesNumber; i ++)
229 wlm.addResource(check.resources[i]);
231 // Add 4 core tasks first
233 for(std::size_t i = 0; i < tasksNumber; i++)
234 wlm.addTask(&tasks[i]);
235 std::chrono::steady_clock::time_point start_time;
236 start_time = std::chrono::steady_clock::now();
237 wlm.start(); // tasks can be added before start.
239 std::chrono::steady_clock::time_point end_time;
240 end_time = std::chrono::steady_clock::now();
241 std::chrono::seconds duration;
242 duration = std::chrono::duration_cast<std::chrono::seconds>
243 (end_time - start_time);
244 std::chrono::seconds maxExpectedDuration(22);
245 CPPUNIT_ASSERT( duration < maxExpectedDuration );
246 DEBUG_LOG("Test step duration : ", duration.count(), "s");
249 // Add 1 core tasks first
251 // WARNING: std::size_t is always >= 0
252 for(int i = tasksNumber-1; i >= 0; i--)
253 wlm.addTask(&tasks[i]);
254 start_time = std::chrono::steady_clock::now();
255 wlm.start(); // tasks can be added before start.
257 end_time = std::chrono::steady_clock::now();
258 duration = std::chrono::duration_cast<std::chrono::seconds>
259 (end_time - start_time);
260 CPPUNIT_ASSERT( duration < maxExpectedDuration );
261 DEBUG_LOG("Test step duration : ", duration.count(), "s");
264 // Add 1 core tasks first & start before addTask
266 start_time = std::chrono::steady_clock::now();
268 for(int i = tasksNumber-1; i >= 0; i--)
269 wlm.addTask(&tasks[i]);
271 end_time = std::chrono::steady_clock::now();
272 duration = std::chrono::duration_cast<std::chrono::seconds>
273 (end_time - start_time);
274 CPPUNIT_ASSERT( duration < maxExpectedDuration );
275 DEBUG_LOG("Test step duration : ", duration.count(), "s");
281 * Test the case of tasks which need no resources and can be run whithout
287 WorkloadManager::ContainerType ctype;
288 ctype.ignoreResources = true;
289 constexpr std::size_t tasksNumber = 20;
290 MyTask tasks[tasksNumber];
291 for(std::size_t i = 0; i < tasksNumber; i++)
292 tasks[i].reset(i, &ctype, 1, &check);
293 WorkloadManager::DefaultAlgorithm algo;
294 WorkloadManager::WorkloadManager wlm(algo);
295 // no resource needed
296 std::chrono::steady_clock::time_point start_time;
297 std::chrono::steady_clock::time_point end_time;
298 std::chrono::seconds duration;
299 start_time = std::chrono::steady_clock::now();
301 for(std::size_t i = 0; i < tasksNumber; i++)
302 wlm.addTask(&tasks[i]);
304 end_time = std::chrono::steady_clock::now();
305 duration = std::chrono::duration_cast<std::chrono::seconds>
306 (end_time - start_time);
307 std::chrono::seconds maxExpectedDuration(2);
308 CPPUNIT_ASSERT( duration <= maxExpectedDuration);
312 * Test the case of a task which need more cores than any resource has.
314 class ErrorTask : public WorkloadManager::Task
317 ErrorTask(int nb_cores): WorkloadManager::Task(), _type(), _ok(), _message()
319 _type.ignoreResources = false;
320 _type.neededCores = nb_cores;
323 const WorkloadManager::ContainerType& type()const override {return _type;}
325 void run(const WorkloadManager::RunInfo& c)override
328 _message = c.error_message;
331 bool checkState(bool ok, const std::string& message)
333 return (ok == _ok) && (message == _message);
337 WorkloadManager::ContainerType _type;
339 std::string _message;
344 WorkloadManager::Resource r;
348 ErrorTask t1(1), t2(10);
349 WorkloadManager::DefaultAlgorithm algo;
350 WorkloadManager::WorkloadManager wlm(algo);
356 CPPUNIT_ASSERT(t1.checkState(true, ""));
357 CPPUNIT_ASSERT(t2.checkState(false, "No resource can run this task."));
358 // no error mode: wait for a resource to be added
359 WorkloadManager::DefaultAlgorithm algo_noerror;
360 WorkloadManager::WorkloadManager wlm2(algo_noerror);
365 std::this_thread::sleep_for(std::chrono::seconds(1));
371 CPPUNIT_ASSERT(t1.checkState(true, ""));
372 CPPUNIT_ASSERT(t2.checkState(true, ""));
375 CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
377 #include "BasicMainTest.hxx"