Salome HOME
Merge from V6_main 01/04/2013
[modules/jobmanager.git] / src / engine / BL_JobsManager.cxx
1 // Copyright (C) 2009-2013  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "BL_JobsManager.hxx"
21 #include <sstream>
22
23 #ifdef WNT
24 #undef ERROR
25 #endif
26
27 BL::JobsManager::JobsManager(BL::SALOMEServices * salome_services)
28 {
29   DEBTRACE("Creating BL::JobsManager");
30   BL_ASSERT(salome_services);
31   _salome_services = salome_services;
32   _salome_services->set_manager(this);
33   _observer = NULL;
34   _name_counter = 0;
35 }
36
37 BL::JobsManager::~JobsManager()
38 {
39   DEBTRACE("Destroying BL::JobsManager");
40
41   // Delete All Jobs !
42   _jobs_it = _jobs.begin();
43   for(;_jobs_it != _jobs.end();_jobs_it++)
44     delete _jobs_it->second;
45 }
46
47 void 
48 BL::JobsManager::setObserver(BL::Observer * observer)
49 {
50   BL_ASSERT(observer);
51   _observer = observer;
52 }
53
54 BL::Job *
55 BL::JobsManager::createJob(const std::string & name)
56 {
57   DEBTRACE("createJob BL::JobsManager");
58
59   BL::Job * new_job = NULL;
60
61   _thread_mutex_jobs_map.lock();
62   _jobs_it = _jobs.find(name);
63   if (_jobs_it == _jobs.end())
64   {
65     new_job = new BL::Job(name);
66
67     // Ajout dans la liste
68     _jobs[name] = new_job;
69   }
70   else
71     DEBTRACE("createJob Error !!!! Job already exist: " << name);
72
73   _thread_mutex_jobs_map.unlock();
74   return new_job;
75 }
76
77 void
78 BL::JobsManager::addJobToLauncher(const std::string & name)
79 {
80   DEBTRACE("addJobToLauncher BL::JobsManager");
81
82   _thread_mutex_jobs_map.lock();
83   _jobs_it = _jobs.find(name);
84   if (_jobs_it == _jobs.end())
85   {
86     // TODO: SHOULD SEND an exeception...
87     DEBMSG("[addJobToLauncher] failed, job was not found");
88   }
89   std::string result = "";
90   result = _salome_services->create_job(_jobs_it->second);
91
92   // Unlock is here to be sure that
93   // method setSalomeLauncherId is called if
94   // the creation is successfull
95   _thread_mutex_jobs_map.unlock();
96
97   if (_observer)
98     if (result != "")
99     {
100       _jobs_it->second->setState(BL::Job::NOT_CREATED);
101       _observer->sendEvent("create_job", "Error", name, result);
102     }
103     else
104       _observer->sendEvent("create_job", "Ok", name, "");
105 }
106
107 void
108 BL::JobsManager::removeJob(const std::string & name)
109 {
110   DEBTRACE("removeJob BL::JobsManager");
111   _thread_mutex_jobs_map.lock();
112   _jobs_it = _jobs.find(name);
113   if (_jobs_it != _jobs.end())
114   {
115     std::string result = "";
116     if (_jobs_it->second->getSalomeLauncherId() != -1)
117       result = _salome_services->delete_job(_jobs_it->second);
118
119     delete _jobs_it->second;
120     _jobs.erase(_jobs_it->first);
121
122     if (_observer)
123       if (result != "")
124         _observer->sendEvent("delete_job", "Error", name, result);
125       else
126         _observer->sendEvent("delete_job", "Ok", name, "");
127   }
128   else
129     DEBTRACE("removeJob Error !!!! Job does not exist: " << name);
130   _thread_mutex_jobs_map.unlock();
131 }
132
133 BL::Job *
134 BL::JobsManager::getJob(const std::string & name)
135 {
136   DEBTRACE("getJob BL::JobsManager called");
137   return _jobs[name];
138 }
139
140 std::map<std::string, BL::Job *> & 
141 BL::JobsManager::getJobs()
142 {
143   return _jobs;
144 }
145
146 bool 
147 BL::JobsManager::job_already_exist(const std::string & name)
148 {
149   bool result = true;
150
151   _thread_mutex_jobs_map.lock();
152   _jobs_it = _jobs.find(name);
153   if (_jobs_it == _jobs.end())
154     result = false;
155   _thread_mutex_jobs_map.unlock();
156
157   return result;
158 }
159
160 void
161 BL::JobsManager::start_job(const std::string & name)
162 {
163   DEBTRACE("BL::JobsManager::start_job called for job: " << name);
164   // Prepare Info for thread
165   BL::JobsManager::thread_info * ti = new thread_info();
166   ti->object_ptr = this;
167   ti->job_name = name;
168   omni_thread::create(BL::JobsManager::starting_job_thread, ti);
169
170 }
171
172 void 
173 BL::JobsManager::starting_job_thread(void * object_ptr)
174 {
175   DEBTRACE("starting_job_thread BL::JobsManager called");
176   BL::JobsManager::thread_info * ti = reinterpret_cast<BL::JobsManager::thread_info*>(object_ptr);
177   BL::JobsManager * object = ti->object_ptr;
178   std::string job_name =  ti->job_name;
179
180   // Check job exits
181   object->_thread_mutex_jobs_map.lock();
182   object->_jobs_it = object->_jobs.find(job_name);
183   if (object->_jobs_it == object->_jobs.end())
184   {
185     DEBTRACE("BL::JobsManager::start_job job unknown: " << job_name);
186   }
187   else
188   {
189     BL::Job * job = object->getJob(job_name);
190     std::string result = object->_salome_services->start_job(job);
191     if (result == "")
192     {
193       job->setState(BL::Job::QUEUED);
194       job->setThreadState(BL::Job::NOTHING);
195       if (object->_observer)
196         object->_observer->sendEvent("start_job", "Ok", job_name, "");
197     }
198     else
199     {
200       job->setState(BL::Job::ERROR);
201       job->setThreadState(BL::Job::NOTHING);
202       if (object->_observer)
203         object->_observer->sendEvent("start_job", "Error", job_name, result);
204     }
205   }
206   object->_thread_mutex_jobs_map.unlock();
207 }
208
209 void
210 BL::JobsManager::stop_job(const std::string & name)
211 {
212   DEBTRACE("stop_job BL::JobsManager called");
213
214   _thread_mutex_jobs_map.lock();
215   // Check job exits
216   _jobs_it = _jobs.find(name);
217   if (_jobs_it == _jobs.end())
218   {
219     DEBTRACE("BL::JobsManager::stop_job job unknown: " << name);
220     _thread_mutex_jobs_map.unlock();
221     return;
222   }
223   else
224   {
225     // Prepare Info for thread
226     BL::JobsManager::thread_info * ti = new thread_info();
227     ti->object_ptr = this;
228     ti->job_name = name;
229     omni_thread::create(BL::JobsManager::stop_job_thread, ti);
230   }
231 }
232
233 void 
234 BL::JobsManager::stop_job_thread(void * object_ptr)
235 {
236   DEBTRACE("stop_job_thread BL::JobsManager called");
237   BL::JobsManager::thread_info * ti = reinterpret_cast<BL::JobsManager::thread_info*>(object_ptr);
238   BL::JobsManager * object = ti->object_ptr;
239   std::string job_name =  ti->job_name;
240   BL::Job * job = object->getJob(job_name);
241
242   std::string result = object->_salome_services->stop_job(job);
243
244   // End
245   if (result == "")
246   {
247     if (object->_observer)
248       object->_observer->sendEvent("stop_job", "Ok", job_name, "");
249   }
250   else
251   {
252     if (object->_observer)
253       object->_observer->sendEvent("stop_job", "Error", job_name, result);
254   }
255   object->_thread_mutex_jobs_map.unlock();
256 }
257
258 void
259 BL::JobsManager::refresh_jobs()
260 {
261   DEBTRACE("refresh_jobs BL::JobsManager called");
262
263   omni_thread::create(BL::JobsManager::refresh_jobs_thread, this);
264 }
265
266 void
267 BL::JobsManager::refresh_jobs_thread(void * object_ptr)
268 {
269   DEBTRACE("refresh_job BL::JobsManager called");
270   BL::JobsManager * object = reinterpret_cast<BL::JobsManager*>(object_ptr);
271
272   //iterate on all jobs
273   object->_thread_mutex_jobs_map.lock();
274   _jobs_map::iterator jobs_it;
275   jobs_it = object->_jobs.begin();
276   for(;jobs_it != object->_jobs.end();jobs_it++)
277   {
278     BL::Job * job = jobs_it->second;
279     if (job->getSalomeLauncherId() != -1)
280     {
281       if (job->getThreadState() == BL::Job::NOTHING)
282       {
283         BL::Job::State job_state = job->getState();
284         if (job_state != BL::Job::FINISHED &&
285             job_state != BL::Job::ERROR    &&
286             job_state != BL::Job::FAILED   &&
287             job_state != BL::Job::NOT_CREATED)
288         {
289           std::string result_launcher = object->_salome_services->refresh_job(job);
290           std::string result_job = job->setStringState(result_launcher);
291           if (result_job == "RefreshError")
292           {
293             // Error using launcher...
294             if (object->_observer)
295               object->_observer->sendEvent("refresh_job", "Error", job->getName(), result_launcher);
296           }
297           else if (result_job != "")
298           {
299             if (object->_observer)
300               object->_observer->sendEvent("refresh_job", "Ok", job->getName(), result_job);
301           }
302         }
303       }
304     }
305   }
306   object->_thread_mutex_jobs_map.unlock();
307 }
308
309 void
310 BL::JobsManager::get_results_job(const std::string & name)
311 {
312   DEBTRACE("get_results_job BL::JobsManager called");
313
314   _thread_mutex_jobs_map.lock();
315   // Check job exits
316   _jobs_it = _jobs.find(name);
317   if (_jobs_it == _jobs.end())
318   {
319     DEBTRACE("BL::JobsManager::get_results_job job unknown: " << name);
320     _thread_mutex_jobs_map.unlock();
321     return;
322   }
323   else
324   {
325     BL::Job * job = _jobs_it->second; 
326     if (job->getState() == BL::Job::FINISHED ||
327         job->getState() == BL::Job::ERROR    ||
328         job->getState() == BL::Job::FAILED
329         )
330     {
331       // Prepare Info for thread
332       BL::JobsManager::thread_info * ti = new thread_info();
333       ti->object_ptr = this;
334       ti->job_name = name;
335       omni_thread::create(BL::JobsManager::get_results_job_thread, ti);
336     }
337     else
338     {
339       DEBTRACE("BL::JobsManager::get_results_job job bad job state !");
340       _thread_mutex_jobs_map.unlock();
341       return;
342     }
343   }
344 }
345
346 void 
347 BL::JobsManager::get_results_job_thread(void * object_ptr)
348 {
349   DEBTRACE("get_results_job_thread BL::JobsManager called");
350   BL::JobsManager::thread_info * ti = reinterpret_cast<BL::JobsManager::thread_info*>(object_ptr);
351   BL::JobsManager * object = ti->object_ptr;
352   std::string job_name =  ti->job_name;
353   BL::Job * job = object->getJob(job_name);
354
355   std::string result = object->_salome_services->get_results_job(job);
356
357   // End
358   if (result == "")
359   {
360     if (object->_observer)
361       object->_observer->sendEvent("get_results_job", "Ok", job_name, "");
362   }
363   else
364   {
365     if (object->_observer)
366       object->_observer->sendEvent("get_results_job", "Error", job_name, result);
367   }
368   object->_thread_mutex_jobs_map.unlock();
369 }
370
371 void
372 BL::JobsManager::save_jobs(const std::string & xml_file)
373 {
374   DEBTRACE("BL::JobsManager::save_jobs called for : " << xml_file);
375
376   // Prepare Info for thread
377   BL::JobsManager::thread_info_file * ti = new thread_info_file();
378   ti->object_ptr = this;
379   ti->file_name = xml_file;
380   omni_thread::create(BL::JobsManager::save_jobs_thread, ti);
381 }
382
383 void
384 BL::JobsManager::load_jobs(const std::string & xml_file)
385 {
386   DEBTRACE("BL::JobsManager::load_jobs called for : " << xml_file);
387
388   // Prepare Info for thread
389   BL::JobsManager::thread_info_file * ti = new thread_info_file();
390   ti->object_ptr = this;
391   ti->file_name = xml_file;
392   omni_thread::create(BL::JobsManager::load_jobs_thread, ti);
393 }
394
395 void
396 BL::JobsManager::save_jobs_thread(void * object_ptr)
397 {
398   DEBTRACE("save_jobs_thread BL::JobsManager called");
399   BL::JobsManager::thread_info_file * ti = reinterpret_cast<BL::JobsManager::thread_info_file*>(object_ptr);
400   BL::JobsManager * object = ti->object_ptr;
401   std::string file_name =  ti->file_name;
402
403   object->_thread_mutex_jobs_map.lock();
404   std::string result = object->_salome_services->save_jobs(file_name);
405   object->_thread_mutex_jobs_map.unlock();
406
407   if (result != "")
408     if (object->_observer)
409       object->_observer->sendEvent("save_jobs", "Error", "", result);
410 }
411
412 void
413 BL::JobsManager::load_jobs_thread(void * object_ptr)
414 {
415   DEBTRACE("load_jobs_thread BL::JobsManager called");
416   BL::JobsManager::thread_info_file * ti = reinterpret_cast<BL::JobsManager::thread_info_file*>(object_ptr);
417   BL::JobsManager * object = ti->object_ptr;
418   std::string file_name =  ti->file_name;
419
420   object->_thread_mutex_jobs_map.lock();
421   std::string result = object->_salome_services->load_jobs(file_name);
422   object->_thread_mutex_jobs_map.unlock();
423
424   if (result != "")
425     if (object->_observer)
426       object->_observer->sendEvent("load_jobs", "Error", "", result);
427 }
428
429 void
430 BL::JobsManager::launcher_event_save_jobs(const std::string & data)
431 {
432   if (_observer)
433     _observer->sendEvent("save_jobs", "Ok", "", data);
434 }
435
436 void
437 BL::JobsManager::launcher_event_load_jobs(const std::string & data)
438 {
439   if (_observer)
440     _observer->sendEvent("load_jobs", "Ok", "", data);
441 }
442
443 void
444 BL::JobsManager::launcher_event_new_job(const std::string & data)
445 {
446   int job_number;
447   std::istringstream job_number_stream(data);
448   if (job_number_stream >> job_number)
449   {
450         DEBTRACE("Receive NEWJOB:" << job_number);
451     BL::JobsManager::thread_info_new_job * ti = new thread_info_new_job();
452     ti->object_ptr = this;
453     ti->job_number = job_number;
454     omni_thread::create(BL::JobsManager::launcher_event_new_job_thread, ti);
455   }
456 }
457
458 void
459 BL::JobsManager::launcher_event_update_job_state(const std::string & data)
460 {
461   refresh_jobs();
462 }
463
464 void
465 BL::JobsManager::launcher_event_new_job_thread(void * object_ptr)
466 {
467   DEBTRACE("Start of BL::JobsManager::launcher_event_new_job_thread");
468   BL::JobsManager::thread_info_new_job * ti = reinterpret_cast<BL::JobsManager::thread_info_new_job*>(object_ptr);
469   BL::JobsManager * object = ti->object_ptr;
470   int job_number =  ti->job_number;
471
472   object->_thread_mutex_jobs_map.lock();
473
474   // 1: Check if job is not already on our map
475   bool job_in_map = false;
476   _jobs_map::iterator jobs_it;
477   jobs_it = object->_jobs.begin();
478   for(;jobs_it != object->_jobs.end();jobs_it++)
479   {
480     BL::Job * job = jobs_it->second;
481     if (job->getSalomeLauncherId() == job_number)
482       job_in_map = true;
483   }
484
485   if (!job_in_map)
486   {
487     // 2: We try to get job informations
488
489     BL::Job * new_job = object->_salome_services->get_new_job(job_number);
490
491     // 3: We add it
492     if (new_job)
493     {
494       // 4: Check if job has a name or if the name already exists
495       if (new_job->getName() == "")
496       {
497         std::ostringstream name_stream;
498         name_stream << "no_name_" << object->_name_counter;
499         object->_name_counter++;
500         new_job->setName(name_stream.str());
501       }
502
503       _jobs_map::iterator _jobs_it_name = object->_jobs.find(new_job->getName());
504       if (_jobs_it_name != object->_jobs.end())
505       {
506         std::ostringstream name_stream;
507         name_stream << new_job->getName() << "_" << object->_name_counter;
508         object->_name_counter++;
509         new_job->setName(name_stream.str());
510       }
511       // 5: Insert job
512       object->_jobs[new_job->getName()] = new_job;
513       if (object->_observer)
514         object->_observer->sendEvent("add_job", "Ok", new_job->getName(), "");
515     }
516   }
517
518   object->_thread_mutex_jobs_map.unlock();
519 }
520
521 void
522 BL::JobsManager::launcher_event_remove_job(const std::string & data)
523 {
524   int job_number;
525   std::istringstream job_number_stream(data);
526   if (job_number_stream >> job_number)
527   {
528     BL::JobsManager::thread_info_new_job * ti = new thread_info_new_job();
529     ti->object_ptr = this;
530     ti->job_number = job_number;
531     omni_thread::create(BL::JobsManager::launcher_event_remove_job_thread, ti);
532   }
533 }
534
535 void
536 BL::JobsManager::launcher_event_remove_job_thread(void * object_ptr)
537 {
538   DEBTRACE("Start of BL::JobsManager::launcher_event_remove_job_thread");
539   BL::JobsManager::thread_info_new_job * ti = reinterpret_cast<BL::JobsManager::thread_info_new_job*>(object_ptr);
540   BL::JobsManager * object = ti->object_ptr;
541   int job_number =  ti->job_number;
542
543   object->_thread_mutex_jobs_map.lock();
544
545   _jobs_map::iterator jobs_it;
546   jobs_it = object->_jobs.begin();
547   for(;jobs_it != object->_jobs.end();jobs_it++)
548   {
549     BL::Job * job = jobs_it->second;
550     if (job->getSalomeLauncherId() == job_number)
551     {
552       job->setSalomeLauncherId(-1);
553       if (object->_observer)
554         object->_observer->sendEvent("to_remove_job", "Ok", job->getName(), "");
555     }
556   }
557
558   object->_thread_mutex_jobs_map.unlock();
559 }