Salome HOME
Initial commit
[modules/jobmanager.git] / src / engine / BL_JobsManager.cxx
1 //  Copyright (C) 2009 CEA/DEN, EDF R&D
2 //
3 //  This library is free software; you can redistribute it and/or
4 //  modify it under the terms of the GNU Lesser General Public
5 //  License as published by the Free Software Foundation; either
6 //  version 2.1 of the License.
7 //
8 //  This library is distributed in the hope that it will be useful,
9 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
10 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 //  Lesser General Public License for more details.
12 //
13 //  You should have received a copy of the GNU Lesser General Public
14 //  License along with this library; if not, write to the Free Software
15 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "BL_JobsManager.hxx"
21
22 BL::JobsManager::JobsManager(BL::SALOMEServices * salome_services)
23 {
24   DEBTRACE("Creating BL::JobsManager");
25   BL_ASSERT(salome_services);
26   _salome_services = salome_services;
27   _observer = NULL;
28 }
29
30 BL::JobsManager::~JobsManager()
31 {
32   DEBTRACE("Destroying BL::JobsManager");
33
34   // Delete All Jobs !
35   _jobs_it = _jobs.begin();
36   for(;_jobs_it != _jobs.end();_jobs_it++)
37     delete _jobs_it->second;
38 }
39
40 void 
41 BL::JobsManager::setObserver(BL::Observer * observer)
42 {
43   BL_ASSERT(observer);
44   _observer = observer;
45 }
46
47 BL::Job *
48 BL::JobsManager::addNewJob(const std::string & name)
49 {
50   DEBTRACE("addNewJob BL::JobsManager");
51
52   BL::Job * new_job = NULL;
53
54   _jobs_it = _jobs.find(name);
55   if (_jobs_it == _jobs.end())
56   {
57     new_job = new BL::Job(name);
58
59     // Ajout dans la liste
60     _jobs[name] = new_job;
61   }
62   else
63     DEBTRACE("addNewJob Error !!!! Job already exist: " << name);
64
65   return new_job;
66 }
67
68 void
69 BL::JobsManager::removeJob(const std::string & name)
70 {
71   DEBTRACE("removeJob BL::JobsManager");
72   _jobs_it = _jobs.find(name);
73   if (_jobs_it != _jobs.end())
74   {
75     std::string result = "";
76     if (_jobs_it->second->getSalomeLauncherId() != -1)
77       result = _salome_services->delete_job(_jobs_it->second);
78
79     delete _jobs_it->second;
80     _jobs.erase(_jobs_it->first);
81
82     if (_observer)
83       if (result != "")
84         _observer->sendEvent("delete_job", "Error", name, result);
85       else
86         _observer->sendEvent("delete_job", "Ok", name, "");
87   }
88   else
89     DEBTRACE("removeJob Error !!!! Job does not exist: " << name);
90 }
91
92 BL::Job *
93 BL::JobsManager::getJob(const std::string & name)
94 {
95   DEBTRACE("getJob BL::JobsManager called");
96   return _jobs[name];
97 }
98
99 std::map<std::string, BL::Job *> & 
100 BL::JobsManager::getJobs()
101 {
102   return _jobs;
103 }
104
105 bool 
106 BL::JobsManager::job_already_exist(const std::string & name)
107 {
108   _jobs_it = _jobs.find(name);
109
110   if (_jobs_it == _jobs.end())
111     return false;
112   else
113     return true;
114
115 }
116
117 void
118 BL::JobsManager::start_job(const std::string & name)
119 {
120   DEBTRACE("BL::JobsManager::start_job called for job: " << name);
121
122   // Check job exits
123   _jobs_it = _jobs.find(name);
124   if (_jobs_it == _jobs.end())
125   {
126     DEBTRACE("BL::JobsManager::start_job job unknown: " << name);
127     return;
128   }
129   else
130   {
131     _thread_mutex.lock();
132     BL::Job * job = _jobs_it->second; 
133     // Check Job Exec State - multithread protection
134     if (job->getThreadState() == BL::Job::NOTHING)
135     {
136       // Prepare Info for thread
137       BL::JobsManager::thread_info * ti = new thread_info();
138       ti->object_ptr = this;
139       ti->job_name = name;
140       job->setState(BL::Job::IN_PROCESS);
141       job->setThreadState(BL::Job::STARTING);
142       omni_thread::create(BL::JobsManager::starting_job_thread, ti);
143     }
144     else
145     {
146       DEBTRACE("BL::JobsManager::start_job nothin to do, job already starting, job name: " << name);
147     }
148     _thread_mutex.unlock();
149   }
150 }
151
152 void 
153 BL::JobsManager::starting_job_thread(void * object_ptr)
154 {
155   DEBTRACE("starting_job_thread BL::JobsManager called");
156   BL::JobsManager::thread_info * ti = reinterpret_cast<BL::JobsManager::thread_info*>(object_ptr);
157   BL::JobsManager * object = ti->object_ptr;
158   std::string job_name =  ti->job_name;
159   BL::Job * job = object->getJob(job_name);
160
161   std::string result = object->_salome_services->start_job(job);
162
163   object->_thread_mutex.lock();
164   // End
165   if (result == "")
166   {
167     if (object->_observer)
168       object->_observer->sendEvent("start_job", "Ok", job_name, "");
169     job->setState(BL::Job::QUEUED);
170     job->setThreadState(BL::Job::NOTHING);
171   }
172   else
173   {
174     if (object->_observer)
175       object->_observer->sendEvent("start_job", "Error", job_name, result);
176     job->setState(BL::Job::ERROR);
177     job->setThreadState(BL::Job::NOTHING);
178   }
179   object->_thread_mutex.unlock();
180 }
181
182 void
183 BL::JobsManager::refresh_jobs()
184 {
185   DEBTRACE("refresh_jobs BL::JobsManager called");
186
187   omni_thread::create(BL::JobsManager::refresh_job, this);
188 }
189
190 void
191 BL::JobsManager::refresh_job(void * object_ptr)
192 {
193   DEBTRACE("refresh_job BL::JobsManager called");
194   BL::JobsManager * object = reinterpret_cast<BL::JobsManager*>(object_ptr);
195
196   //iterate on all jobs
197   _jobs_map::iterator jobs_it;
198   jobs_it = object->_jobs.begin();
199   for(;jobs_it != object->_jobs.end();jobs_it++)
200   {
201     BL::Job * job = jobs_it->second;
202     if (job->getSalomeLauncherId() != -1)
203     {
204       object->_thread_mutex.lock();
205       if (job->getThreadState() == BL::Job::NOTHING)
206       {
207         BL::Job::State job_state = job->getState();
208         if (job_state != BL::Job::FINISHED or job_state != BL::Job::ERROR)
209         {
210           std::string result = object->_salome_services->refresh_job(job);
211           if (result == "QUEUED")
212           {
213             if (job_state != BL::Job::QUEUED)
214             {
215               job->setState(BL::Job::QUEUED);
216               if (object->_observer)
217                 object->_observer->sendEvent("refresh_job", "Ok", job->getName(), "new state");
218             }
219           }
220           else if (result == "IN_PROCESS")
221           {
222             if (job_state != BL::Job::IN_PROCESS)
223             {
224               job->setState(BL::Job::IN_PROCESS);
225               if (object->_observer)
226                 object->_observer->sendEvent("refresh_job", "Ok", job->getName(), "new state");
227             }
228           }
229           else if (result == "RUNNING")
230           {
231             if (job_state != BL::Job::RUNNING)
232             {
233               job->setState(BL::Job::RUNNING);
234               if (object->_observer)
235                 object->_observer->sendEvent("refresh_job", "Ok", job->getName(), "new state");
236             }
237           }
238           else if (result == "PAUSED")
239           {
240             if (job_state != BL::Job::PAUSED)
241             {
242               job->setState(BL::Job::PAUSED);
243               if (object->_observer)
244                 object->_observer->sendEvent("refresh_job", "Ok", job->getName(), "new state");
245             }
246           }
247           else if (result == "FINISHED")
248           {
249             job->setState(BL::Job::FINISHED);
250             if (object->_observer)
251               object->_observer->sendEvent("refresh_job", "Ok", job->getName(), "new state");
252           }
253           else if (result == "ERROR")
254           {
255             job->setState(BL::Job::ERROR);
256             if (object->_observer)
257               object->_observer->sendEvent("refresh_job", "Ok", job->getName(), "new state");
258           }
259           else
260           {
261             // Error using launcher...
262             if (object->_observer)
263               object->_observer->sendEvent("refresh_job", "Error", job->getName(), result);
264           }
265         }
266       }
267       object->_thread_mutex.unlock();
268     }
269   }
270 }
271
272 void
273 BL::JobsManager::get_results_job(const std::string & name)
274 {
275   DEBTRACE("get_results_job BL::JobsManager called");
276    
277   // Check job exits
278   _jobs_it = _jobs.find(name);
279   if (_jobs_it == _jobs.end())
280   {
281     DEBTRACE("BL::JobsManager::get_results_job job unknown: " << name);
282     return;
283   }
284   else
285   {
286     BL::Job * job = _jobs_it->second; 
287     if (job->getState() == BL::Job::FINISHED)
288     {
289       // Prepare Info for thread
290       BL::JobsManager::thread_info * ti = new thread_info();
291       ti->object_ptr = this;
292       ti->job_name = name;
293       omni_thread::create(BL::JobsManager::get_results_job_thread, ti);
294     }
295     else
296     {
297       DEBTRACE("BL::JobsManager::get_results_job job bad job state !");
298       return;
299     }
300   }
301 }
302
303 void 
304 BL::JobsManager::get_results_job_thread(void * object_ptr)
305 {
306   DEBTRACE("get_results_job_thread BL::JobsManager called");
307   BL::JobsManager::thread_info * ti = reinterpret_cast<BL::JobsManager::thread_info*>(object_ptr);
308   BL::JobsManager * object = ti->object_ptr;
309   std::string job_name =  ti->job_name;
310   BL::Job * job = object->getJob(job_name);
311
312   object->_thread_mutex_results.lock();
313   std::string result = object->_salome_services->get_results_job(job);
314
315   // End
316   if (result == "")
317   {
318     if (object->_observer)
319       object->_observer->sendEvent("get_results_job", "Ok", job_name, "");
320   }
321   else
322   {
323     if (object->_observer)
324       object->_observer->sendEvent("get_results_job", "Error", job_name, result);
325   }
326   object->_thread_mutex_results.unlock();
327 }