]> SALOME platform Git repositories - modules/yacs.git/blob - src/workloadmanager/WorkloadManager.cxx
Salome HOME
Workloadmanager deadlock on big foreach.
[modules/yacs.git] / src / workloadmanager / WorkloadManager.cxx
1 // Copyright (C) 2020-2021  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 #include "WorkloadManager.hxx"
20 #include "Task.hxx"
21 #include <chrono>
22 #include <thread>
23
24
25 namespace WorkloadManager
26 {
27   WorkloadManager::WorkloadManager(WorkloadAlgorithm& algo)
28   : _runningTasks()
29   , _finishedTasks()
30   , _nextIndex(0)
31   , _data_mutex()
32   , _startCondition()
33   , _endCondition()
34   , _stop(true)
35   , _otherThreads()
36   , _algo(algo)
37   {
38   }
39
40   WorkloadManager::~WorkloadManager()
41   {
42     stop();
43   }
44
45   void WorkloadManager::addResource(const Resource& r)
46   {
47     std::unique_lock<std::mutex> lock(_data_mutex);
48     _algo.addResource(r);
49     _startCondition.notify_one();
50   }
51
52   void WorkloadManager::freezeResources()
53   {
54     std::unique_lock<std::mutex> lock(_data_mutex);
55     _algo.freezeResources();
56     _startCondition.notify_one();
57   }
58
59   void WorkloadManager::addTask(Task* t)
60   {
61     std::unique_lock<std::mutex> lock(_data_mutex);
62     _algo.addTask(t);
63     _startCondition.notify_one();
64   }
65
66   void WorkloadManager::start()
67   {
68     {
69       std::unique_lock<std::mutex> lock(_data_mutex);
70       if(!_stop)
71         return; // already started
72       _stop = false;
73     }
74     _otherThreads.emplace_back(std::async(std::launch::async, [this]
75       {
76         runTasks();
77       }));
78     _otherThreads.emplace_back(std::async(std::launch::async, [this]
79       {
80         endTasks();
81       }));
82   }
83
84   void WorkloadManager::stop()
85   {
86     {
87       std::unique_lock<std::mutex> lock(_data_mutex);
88       _stop = true;
89       _algo.freezeResources();
90     }
91     _startCondition.notify_one();
92     _endCondition.notify_one();
93    for(std::future<void>& th : _otherThreads)
94      th.wait();
95   }
96
97   void WorkloadManager::runTasks()
98   {
99     bool threadStop = false;
100     while(!threadStop)
101     {
102       {
103         std::unique_lock<std::mutex> lock(_data_mutex);
104         _startCondition.wait(lock, [this] {return !_algo.empty() || _stop;});
105         RunningInfo taskInfo;
106         while(chooseTaskToRun(taskInfo))
107         {
108           _runningTasks.emplace(taskInfo.id, std::async(std::launch::async, [this, taskInfo]
109             {
110               runOneTask(taskInfo);
111             }));
112         }
113         threadStop = _stop && _algo.empty();
114       }
115       // workaroud to release the lock and give a chance to other tasks to finish
116       std::this_thread::sleep_for(std::chrono::milliseconds(1));
117     }
118   }
119
120   void WorkloadManager::runOneTask(const RunningInfo& taskInfo)
121   {
122     taskInfo.info.task->run(taskInfo.info.worker);
123
124     {
125       std::unique_lock<std::mutex> lock(_data_mutex);
126       _finishedTasks.push(taskInfo);
127       _endCondition.notify_one();
128     }
129   }
130
131   void WorkloadManager::endTasks()
132   {
133     bool threadStop = false;
134     while(!threadStop)
135     {
136       std::unique_lock<std::mutex> lock(_data_mutex);
137       _endCondition.wait(lock, [this]
138                             {
139                               return !_finishedTasks.empty() ||
140                               (_stop && _runningTasks.empty() && _algo.empty());
141                             });
142       while(!_finishedTasks.empty())
143       {
144         RunningInfo taskInfo = _finishedTasks.front();
145         _finishedTasks.pop();
146         _runningTasks[taskInfo.id].wait();
147         _runningTasks.erase(taskInfo.id);
148         _algo.liberate(taskInfo.info);
149       }
150       threadStop = _stop && _runningTasks.empty() && _algo.empty();
151       _startCondition.notify_one();
152     }
153   }
154
155   bool WorkloadManager::chooseTaskToRun(RunningInfo& taskInfo)
156   {
157     // We are already under the lock
158     taskInfo.id = _nextIndex;
159     taskInfo.info = _algo.chooseTask();
160     if(taskInfo.info.taskFound)
161       _nextIndex ++;
162     return taskInfo.info.taskFound;
163   }
164
165 }