]> SALOME platform Git repositories - modules/yacs.git/blob - src/workloadmanager/WorkloadManager.cxx
Salome HOME
Merge branch 'V9_7_BR'
[modules/yacs.git] / src / workloadmanager / WorkloadManager.cxx
1 // Copyright (C) 2020-2021  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 #include "WorkloadManager.hxx"
20 #include "Task.hxx"
21
22 namespace WorkloadManager
23 {
24   WorkloadManager::WorkloadManager(WorkloadAlgorithm& algo)
25   : _runningTasks()
26   , _finishedTasks()
27   , _nextIndex(0)
28   , _data_mutex()
29   , _startCondition()
30   , _endCondition()
31   , _stop(true)
32   , _otherThreads()
33   , _algo(algo)
34   {
35   }
36
37   WorkloadManager::~WorkloadManager()
38   {
39     stop();
40   }
41
42   void WorkloadManager::addResource(const Resource& r)
43   {
44     std::unique_lock<std::mutex> lock(_data_mutex);
45     _algo.addResource(r);
46     _startCondition.notify_one();
47   }
48
49   void WorkloadManager::freezeResources()
50   {
51     std::unique_lock<std::mutex> lock(_data_mutex);
52     _algo.freezeResources();
53     _startCondition.notify_one();
54   }
55
56   void WorkloadManager::addTask(Task* t)
57   {
58     std::unique_lock<std::mutex> lock(_data_mutex);
59     _algo.addTask(t);
60     _startCondition.notify_one();
61   }
62
63   void WorkloadManager::start()
64   {
65     {
66       std::unique_lock<std::mutex> lock(_data_mutex);
67       if(!_stop)
68         return; // already started
69       _stop = false;
70     }
71     _otherThreads.emplace_back(std::async(std::launch::async, [this]
72       {
73         runTasks();
74       }));
75     _otherThreads.emplace_back(std::async(std::launch::async, [this]
76       {
77         endTasks();
78       }));
79   }
80
81   void WorkloadManager::stop()
82   {
83     {
84       std::unique_lock<std::mutex> lock(_data_mutex);
85       _stop = true;
86       _algo.freezeResources();
87     }
88     _startCondition.notify_one();
89     _endCondition.notify_one();
90    for(std::future<void>& th : _otherThreads)
91      th.wait();
92   }
93
94   void WorkloadManager::runTasks()
95   {
96     bool threadStop = false;
97     while(!threadStop)
98     {
99       std::unique_lock<std::mutex> lock(_data_mutex);
100       _startCondition.wait(lock, [this] {return !_algo.empty() || _stop;});
101       RunningInfo taskInfo;
102       while(chooseTaskToRun(taskInfo))
103       {
104         _runningTasks.emplace(taskInfo.id, std::async(std::launch::async, [this, taskInfo]
105           {
106             runOneTask(taskInfo);
107           }));
108       }
109       threadStop = _stop && _algo.empty();
110     }
111   }
112
113   void WorkloadManager::runOneTask(const RunningInfo& taskInfo)
114   {
115     taskInfo.info.task->run(taskInfo.info.worker);
116
117     {
118       std::unique_lock<std::mutex> lock(_data_mutex);
119       _finishedTasks.push(taskInfo);
120       _endCondition.notify_one();
121     }
122   }
123
124   void WorkloadManager::endTasks()
125   {
126     bool threadStop = false;
127     while(!threadStop)
128     {
129       std::unique_lock<std::mutex> lock(_data_mutex);
130       _endCondition.wait(lock, [this]
131                             {
132                               return !_finishedTasks.empty() ||
133                               (_stop && _runningTasks.empty() && _algo.empty());
134                             });
135       while(!_finishedTasks.empty())
136       {
137         RunningInfo taskInfo = _finishedTasks.front();
138         _finishedTasks.pop();
139         _runningTasks[taskInfo.id].wait();
140         _runningTasks.erase(taskInfo.id);
141         _algo.liberate(taskInfo.info);
142       }
143       threadStop = _stop && _runningTasks.empty() && _algo.empty();
144       _startCondition.notify_one();
145     }
146   }
147
148   bool WorkloadManager::chooseTaskToRun(RunningInfo& taskInfo)
149   {
150     // We are already under the lock
151     taskInfo.id = _nextIndex;
152     taskInfo.info = _algo.chooseTask();
153     if(taskInfo.info.taskFound)
154       _nextIndex ++;
155     return taskInfo.info.taskFound;
156   }
157
158 }