1 // Copyright (C) 2020 EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 #include <cppunit/TestFixture.h>
25 #include <cppunit/extensions/HelperMacros.h>
34 #include "../WorkloadManager.hxx"
35 #include "../DefaultAlgorithm.hxx"
37 constexpr bool ACTIVATE_DEBUG_LOG = false;
38 template<typename... Ts>
39 void DEBUG_LOG(Ts... args)
41 if(! ACTIVATE_DEBUG_LOG)
43 if(sizeof...(Ts) == 0)
45 std::ostringstream message;
46 // TODO: C++17 solution: ( (message << args), ...);
47 // since initializer lists guarantee sequencing, this can be used to
48 // call a function on each element of a pack, in order:
49 int dummy[] = { (message << args, 0)...};
51 std::cerr << message.str();
58 virtual void check(const WorkloadManager::RunInfo& c, MyTask* t)=0;
61 template <std::size_t size_R, std::size_t size_T>
62 class Checker : public AbstractChecker
66 void check(const WorkloadManager::RunInfo& c, MyTask* t)override;
70 WorkloadManager::Resource resources[size_R];
71 WorkloadManager::ContainerType types[size_T];
74 int _maxContainersForResource[size_R][size_T];
77 class MyTask : public WorkloadManager::Task
80 const WorkloadManager::ContainerType& type()const override {return *_type;}
81 void run(const WorkloadManager::RunInfo& c)override
83 _check->check(c, this);
85 DEBUG_LOG("Running task ", _id, " on ", c.resource.name, "-", c.type.name,
87 std::this_thread::sleep_for(std::chrono::seconds(_sleep));
88 DEBUG_LOG("Finish task ", _id);
92 const WorkloadManager::ContainerType* type,
94 AbstractChecker * check
104 const WorkloadManager::ContainerType* _type = nullptr;
106 AbstractChecker * _check;
109 template <std::size_t size_R, std::size_t size_T>
110 Checker<size_R, size_T>::Checker()
112 for(std::size_t i=0; i < size_R; i ++)
115 std::ostringstream name;
117 resources[i].name = name.str();
120 for(std::size_t i=0; i < size_T; i ++)
123 std::ostringstream name;
125 types[i].name = name.str();
128 for(std::size_t i=0; i < size_R; i++)
129 for(std::size_t j=0; j < size_T; j++)
130 _maxContainersForResource[i][j] = 0;
133 template <std::size_t size_R, std::size_t size_T>
134 void Checker<size_R, size_T>::check(const WorkloadManager::RunInfo& c,
137 std::unique_lock<std::mutex> lock(_mutex);
138 int& max = _maxContainersForResource[c.resource.id][c.type.id];
143 template <std::size_t size_R, std::size_t size_T>
144 void Checker<size_R, size_T>::globalCheck()
146 for(std::size_t i=0; i < size_R; i++)
148 float global_max = 0;
149 for(std::size_t j=0; j < size_T; j++)
151 int max = _maxContainersForResource[i][j];
152 DEBUG_LOG(resources[i].name, ", ", types[j].name,
153 " max simultaneous runs:", max+1);
154 CPPUNIT_ASSERT( (max+1) * types[j].neededCores <= resources[i].nbCores );
155 global_max += types[j].neededCores * float(max+1);
157 DEBUG_LOG(resources[i].name, " max cores added for evry type: ", global_max);
158 // This assertion may be false if there are more resources than needed.
159 CPPUNIT_ASSERT(global_max >= resources[i].nbCores); // cores fully used
163 template <std::size_t size_R, std::size_t size_T>
164 void Checker<size_R, size_T>::reset()
166 for(std::size_t i=0; i < size_R; i++)
167 for(std::size_t j=0; j < size_T; j++)
168 _maxContainersForResource[i][j] = 0;
171 class MyTest: public CppUnit::TestFixture
173 CPPUNIT_TEST_SUITE(MyTest);
177 CPPUNIT_TEST_SUITE_END();
180 void btest(); // ignore resources
181 void ctest(); // no available resource
185 * General test with 150 tasks of 3 types:
186 * - 50 tasks which need 4 cores for 2s each
187 * - 50 tasks which need 1 core for 1s each
188 * - 50 tasks which need no core but take 2s each
189 * We use 2 resources: 10 cores and 18 cores
190 * We verify the global time of execution.
194 constexpr std::size_t resourcesNumber = 2;
195 constexpr std::size_t typesNumber = 3;
196 Checker<resourcesNumber, typesNumber> check;
197 check.resources[0].nbCores = 10;
198 check.resources[1].nbCores = 18;
199 check.types[0].neededCores = 4.0;
200 check.types[1].neededCores = 1.0;
201 check.types[2].neededCores = 0.0; // tasks to be run with no cost
203 for(std::size_t i=0; i < resourcesNumber; i ++)
204 DEBUG_LOG(check.resources[i].name, " has ", check.resources[i].nbCores,
206 for(std::size_t i=0; i < typesNumber; i ++)
207 DEBUG_LOG(check.types[i].name, " needs ", check.types[i].neededCores,
210 constexpr std::size_t tasksNumber = 150;
211 MyTask tasks[tasksNumber];
212 for(int type_id = 0; type_id < typesNumber; type_id++)
213 for(int j = type_id * tasksNumber / typesNumber;
214 j < (type_id + 1) * tasksNumber / typesNumber;
216 // id, ContainerType, sleep (1|2s)
217 tasks[j].reset(j, &check.types[type_id], 2-type_id%2, &check);
219 DEBUG_LOG("Number of tasks: ", tasksNumber);
220 for(int type_id = 0; type_id < typesNumber; type_id++)
221 DEBUG_LOG("Tasks from ", type_id * tasksNumber / typesNumber,
222 " to ", (type_id + 1) * tasksNumber / typesNumber,
223 " are of type ", check.types[type_id].name);
225 WorkloadManager::DefaultAlgorithm algo;
226 WorkloadManager::WorkloadManager wlm(algo);
227 for(std::size_t i=0; i < resourcesNumber; i ++)
228 wlm.addResource(check.resources[i]);
230 // Add 4 core tasks first
232 for(std::size_t i = 0; i < tasksNumber; i++)
233 wlm.addTask(&tasks[i]);
234 std::chrono::steady_clock::time_point start_time;
235 start_time = std::chrono::steady_clock::now();
236 wlm.start(); // tasks can be added before start.
238 std::chrono::steady_clock::time_point end_time;
239 end_time = std::chrono::steady_clock::now();
240 std::chrono::seconds duration;
241 duration = std::chrono::duration_cast<std::chrono::seconds>
242 (end_time - start_time);
243 std::chrono::seconds maxExpectedDuration(22);
244 CPPUNIT_ASSERT( duration < maxExpectedDuration );
245 DEBUG_LOG("Test step duration : ", duration.count(), "s");
248 // Add 1 core tasks first
250 // WARNING: std::size_t is always >= 0
251 for(int i = tasksNumber-1; i >= 0; i--)
252 wlm.addTask(&tasks[i]);
253 start_time = std::chrono::steady_clock::now();
254 wlm.start(); // tasks can be added before start.
256 end_time = std::chrono::steady_clock::now();
257 duration = std::chrono::duration_cast<std::chrono::seconds>
258 (end_time - start_time);
259 CPPUNIT_ASSERT( duration < maxExpectedDuration );
260 DEBUG_LOG("Test step duration : ", duration.count(), "s");
263 // Add 1 core tasks first & start before addTask
265 start_time = std::chrono::steady_clock::now();
267 for(int i = tasksNumber-1; i >= 0; i--)
268 wlm.addTask(&tasks[i]);
270 end_time = std::chrono::steady_clock::now();
271 duration = std::chrono::duration_cast<std::chrono::seconds>
272 (end_time - start_time);
273 CPPUNIT_ASSERT( duration < maxExpectedDuration );
274 DEBUG_LOG("Test step duration : ", duration.count(), "s");
280 * Test the case of tasks which need no resources and can be run whithout
286 WorkloadManager::ContainerType ctype;
287 ctype.ignoreResources = true;
288 constexpr std::size_t tasksNumber = 20;
289 MyTask tasks[tasksNumber];
290 for(std::size_t i = 0; i < tasksNumber; i++)
291 tasks[i].reset(i, &ctype, 1, &check);
292 WorkloadManager::DefaultAlgorithm algo;
293 WorkloadManager::WorkloadManager wlm(algo);
294 // no resource needed
295 std::chrono::steady_clock::time_point start_time;
296 std::chrono::steady_clock::time_point end_time;
297 std::chrono::seconds duration;
298 start_time = std::chrono::steady_clock::now();
300 for(std::size_t i = 0; i < tasksNumber; i++)
301 wlm.addTask(&tasks[i]);
303 end_time = std::chrono::steady_clock::now();
304 duration = std::chrono::duration_cast<std::chrono::seconds>
305 (end_time - start_time);
306 std::chrono::seconds maxExpectedDuration(2);
307 CPPUNIT_ASSERT( duration <= maxExpectedDuration);
311 * Test the case of a task which need more cores than any resource has.
313 class ErrorTask : public WorkloadManager::Task
316 ErrorTask(int nb_cores): WorkloadManager::Task(), _type(), _ok(), _message()
318 _type.ignoreResources = false;
319 _type.neededCores = nb_cores;
322 const WorkloadManager::ContainerType& type()const override {return _type;}
324 void run(const WorkloadManager::RunInfo& c)override
327 _message = c.error_message;
330 bool checkState(bool ok, const std::string& message)
332 return (ok == _ok) && (message == _message);
336 WorkloadManager::ContainerType _type;
338 std::string _message;
343 WorkloadManager::Resource r;
347 ErrorTask t1(1), t2(10);
348 WorkloadManager::DefaultAlgorithm algo;
349 WorkloadManager::WorkloadManager wlm(algo);
355 CPPUNIT_ASSERT(t1.checkState(true, ""));
356 CPPUNIT_ASSERT(t2.checkState(false, "No resource can run this task."));
357 // no error mode: wait for a resource to be added
358 WorkloadManager::DefaultAlgorithm algo_noerror;
359 WorkloadManager::WorkloadManager wlm2(algo_noerror);
364 std::this_thread::sleep_for(std::chrono::seconds(1));
370 CPPUNIT_ASSERT(t1.checkState(true, ""));
371 CPPUNIT_ASSERT(t2.checkState(true, ""));
374 CPPUNIT_TEST_SUITE_REGISTRATION(MyTest);
376 #include "BasicMainTest.hxx"