Salome HOME
9a7ab789258930c89fb0397eadacc87f3d014b73
[modules/jobmanager.git] / src / engine / BL_JobsManager.cxx
1 // Copyright (C) 2009-2013  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "BL_JobsManager.hxx"
21 #include <sstream>
22
23 #ifdef WNT
24 #undef ERROR
25 #endif
26
27 BL::JobsManager::JobsManager(BL::SALOMEServices * salome_services)
28 {
29   DEBTRACE("Creating BL::JobsManager");
30   BL_ASSERT(salome_services);
31   _salome_services = salome_services;
32   _salome_services->set_manager(this);
33   _observer = NULL;
34   _name_counter = 0;
35 }
36
37 BL::JobsManager::~JobsManager()
38 {
39   DEBTRACE("Destroying BL::JobsManager");
40
41   // Delete All Jobs !
42   _jobs_it = _jobs.begin();
43   for(;_jobs_it != _jobs.end();_jobs_it++)
44     delete _jobs_it->second;
45 }
46
47 void 
48 BL::JobsManager::setObserver(BL::Observer * observer)
49 {
50   BL_ASSERT(observer);
51   _observer = observer;
52 }
53
54 BL::Job *
55 BL::JobsManager::createJob(const std::string & name)
56 {
57   DEBTRACE("createJob BL::JobsManager");
58
59   BL::Job * new_job = NULL;
60
61   _thread_mutex_jobs_map.lock();
62   _jobs_it = _jobs.find(name);
63   if (_jobs_it == _jobs.end())
64   {
65     new_job = new BL::Job(name);
66
67     // Ajout dans la liste
68     _jobs[name] = new_job;
69   }
70   else
71     DEBTRACE("createJob Error !!!! Job already exist: " << name);
72
73   _thread_mutex_jobs_map.unlock();
74   return new_job;
75 }
76
77 void
78 BL::JobsManager::addJobToLauncher(const std::string & name)
79 {
80   DEBTRACE("addJobToLauncher BL::JobsManager");
81
82   _thread_mutex_jobs_map.lock();
83   _jobs_it = _jobs.find(name);
84   if (_jobs_it == _jobs.end())
85   {
86     // TODO: SHOULD SEND an exeception...
87     DEBMSG("[addJobToLauncher] failed, job was not found");
88   }
89   std::string result = "";
90   result = _salome_services->create_job(_jobs_it->second);
91
92   // Unlock is here to be sure that
93   // method setSalomeLauncherId is called if
94   // the creation is successfull
95   _thread_mutex_jobs_map.unlock();
96
97   if (_observer)
98     if (result != "")
99     {
100       _jobs_it->second->setState(BL::Job::NOT_CREATED);
101       _observer->sendEvent("create_job", "Error", name, result);
102     }
103     else
104       _observer->sendEvent("create_job", "Ok", name, "");
105 }
106
107 void
108 BL::JobsManager::removeJob(const std::string & name)
109 {
110   DEBTRACE("removeJob BL::JobsManager");
111   _thread_mutex_jobs_map.lock();
112   _jobs_it = _jobs.find(name);
113   if (_jobs_it != _jobs.end())
114   {
115     std::string result = "";
116     if (_jobs_it->second->getSalomeLauncherId() != -1)
117       result = _salome_services->delete_job(_jobs_it->second);
118
119     delete _jobs_it->second;
120     _jobs.erase(_jobs_it->first);
121
122     if (_observer)
123       if (result != "")
124         _observer->sendEvent("delete_job", "Error", name, result);
125       else
126         _observer->sendEvent("delete_job", "Ok", name, "");
127   }
128   else
129     DEBTRACE("removeJob Error !!!! Job does not exist: " << name);
130   _thread_mutex_jobs_map.unlock();
131 }
132
133 BL::Job *
134 BL::JobsManager::getJob(const std::string & name)
135 {
136   DEBTRACE("getJob BL::JobsManager called");
137   return _jobs[name];
138 }
139
140 std::map<std::string, BL::Job *> & 
141 BL::JobsManager::getJobs()
142 {
143   return _jobs;
144 }
145
146 bool 
147 BL::JobsManager::job_already_exist(const std::string & name)
148 {
149   bool result = true;
150
151   _thread_mutex_jobs_map.lock();
152   _jobs_it = _jobs.find(name);
153   if (_jobs_it == _jobs.end())
154     result = false;
155   _thread_mutex_jobs_map.unlock();
156
157   return result;
158 }
159
160 void
161 BL::JobsManager::start_job(const std::string & name)
162 {
163   DEBTRACE("BL::JobsManager::start_job called for job: " << name);
164   // Prepare Info for thread
165   BL::JobsManager::thread_info * ti = new thread_info();
166   ti->object_ptr = this;
167   ti->job_name = name;
168   omni_thread::create(BL::JobsManager::starting_job_thread, ti);
169
170 }
171
172 void 
173 BL::JobsManager::starting_job_thread(void * object_ptr)
174 {
175   DEBTRACE("starting_job_thread BL::JobsManager called");
176   BL::JobsManager::thread_info * ti = reinterpret_cast<BL::JobsManager::thread_info*>(object_ptr);
177   BL::JobsManager * object = ti->object_ptr;
178   std::string job_name =  ti->job_name;
179
180   // Check job exits
181   object->_thread_mutex_jobs_map.lock();
182   object->_jobs_it = object->_jobs.find(job_name);
183   if (object->_jobs_it == object->_jobs.end())
184   {
185     DEBTRACE("BL::JobsManager::start_job job unknown: " << job_name);
186   }
187   else
188   {
189     BL::Job * job = object->getJob(job_name);
190     std::string result = object->_salome_services->start_job(job);
191     if (result == "")
192     {
193       job->setState(BL::Job::QUEUED);
194       job->setThreadState(BL::Job::NOTHING);
195       if (object->_observer)
196         object->_observer->sendEvent("start_job", "Ok", job_name, "");
197     }
198     else
199     {
200       job->setState(BL::Job::ERROR);
201       job->setThreadState(BL::Job::NOTHING);
202       if (object->_observer)
203         object->_observer->sendEvent("start_job", "Error", job_name, result);
204     }
205   }
206   object->_thread_mutex_jobs_map.unlock();
207 }
208
209 void
210 BL::JobsManager::stop_job(const std::string & name)
211 {
212   DEBTRACE("stop_job BL::JobsManager called");
213
214   _thread_mutex_jobs_map.lock();
215   // Check job exits
216   _jobs_it = _jobs.find(name);
217   if (_jobs_it == _jobs.end())
218   {
219     DEBTRACE("BL::JobsManager::stop_job job unknown: " << name);
220     _thread_mutex_jobs_map.unlock();
221     return;
222   }
223   else
224   {
225     // Prepare Info for thread
226     BL::JobsManager::thread_info * ti = new thread_info();
227     ti->object_ptr = this;
228     ti->job_name = name;
229     omni_thread::create(BL::JobsManager::stop_job_thread, ti);
230   }
231 }
232
233 void 
234 BL::JobsManager::stop_job_thread(void * object_ptr)
235 {
236   DEBTRACE("stop_job_thread BL::JobsManager called");
237   BL::JobsManager::thread_info * ti = reinterpret_cast<BL::JobsManager::thread_info*>(object_ptr);
238   BL::JobsManager * object = ti->object_ptr;
239   std::string job_name =  ti->job_name;
240   BL::Job * job = object->getJob(job_name);
241
242   std::string result = object->_salome_services->stop_job(job);
243
244   // End
245   if (result == "")
246   {
247     if (object->_observer)
248       object->_observer->sendEvent("stop_job", "Ok", job_name, "");
249   }
250   else
251   {
252     if (object->_observer)
253       object->_observer->sendEvent("stop_job", "Error", job_name, result);
254   }
255   object->_thread_mutex_jobs_map.unlock();
256 }
257
258 void
259 BL::JobsManager::refresh_jobs()
260 {
261   DEBTRACE("refresh_jobs BL::JobsManager called");
262
263   omni_thread::create(BL::JobsManager::refresh_jobs_thread, this);
264 }
265
266 void
267 BL::JobsManager::refresh_jobs_thread(void * object_ptr)
268 {
269   DEBTRACE("refresh_job BL::JobsManager called");
270   BL::JobsManager * object = reinterpret_cast<BL::JobsManager*>(object_ptr);
271
272   //iterate on all jobs
273   object->_thread_mutex_jobs_map.lock();
274   _jobs_map::iterator jobs_it;
275   jobs_it = object->_jobs.begin();
276   for(;jobs_it != object->_jobs.end();jobs_it++)
277   {
278     BL::Job * job = jobs_it->second;
279     if (job->getSalomeLauncherId() != -1)
280     {
281       if (job->getThreadState() == BL::Job::NOTHING)
282       {
283         BL::Job::State job_state = job->getState();
284         if (job_state != BL::Job::FINISHED &&
285             job_state != BL::Job::ERROR    &&
286             job_state != BL::Job::FAILED   &&
287             job_state != BL::Job::NOT_CREATED)
288         {
289           std::string result_launcher = object->_salome_services->refresh_job(job);
290           std::string result_job = job->setStringState(result_launcher);
291           if (result_job == "RefreshError")
292           {
293             // Error using launcher...
294             if (object->_observer)
295               object->_observer->sendEvent("refresh_job", "Error", job->getName(), result_launcher);
296           }
297           else if (result_job != "")
298           {
299             if (object->_observer)
300                         {
301               object->_observer->sendEvent("refresh_job", "Ok", job->getName(), result_job);
302
303                           // get assigned hostnames when the job will start
304                           if (result_job == "RUNNING")
305                           {
306                                 std::string assigned_hostnames = object->_salome_services->get_assigned_hostnames(job);
307                                 if (assigned_hostnames != "")
308                                 {
309                                         // sent event "get_assigned_hostnames"
310                                         object->_observer->sendEvent("get_assigned_hostnames", "Ok", job->getName(), assigned_hostnames);
311                                 }
312                           }
313                         }
314           }
315         }
316       }
317     }
318   }
319   object->_thread_mutex_jobs_map.unlock();
320 }
321
322 void
323 BL::JobsManager::get_results_job(const std::string & name)
324 {
325   DEBTRACE("get_results_job BL::JobsManager called");
326
327   _thread_mutex_jobs_map.lock();
328   // Check job exits
329   _jobs_it = _jobs.find(name);
330   if (_jobs_it == _jobs.end())
331   {
332     DEBTRACE("BL::JobsManager::get_results_job job unknown: " << name);
333     _thread_mutex_jobs_map.unlock();
334     return;
335   }
336   else
337   {
338     BL::Job * job = _jobs_it->second; 
339     if (job->getState() == BL::Job::FINISHED ||
340         job->getState() == BL::Job::ERROR    ||
341         job->getState() == BL::Job::FAILED
342         )
343     {
344       // Prepare Info for thread
345       BL::JobsManager::thread_info * ti = new thread_info();
346       ti->object_ptr = this;
347       ti->job_name = name;
348       omni_thread::create(BL::JobsManager::get_results_job_thread, ti);
349     }
350     else
351     {
352       DEBTRACE("BL::JobsManager::get_results_job job bad job state !");
353       _thread_mutex_jobs_map.unlock();
354       return;
355     }
356   }
357 }
358
359 void 
360 BL::JobsManager::get_results_job_thread(void * object_ptr)
361 {
362   DEBTRACE("get_results_job_thread BL::JobsManager called");
363   BL::JobsManager::thread_info * ti = reinterpret_cast<BL::JobsManager::thread_info*>(object_ptr);
364   BL::JobsManager * object = ti->object_ptr;
365   std::string job_name =  ti->job_name;
366   BL::Job * job = object->getJob(job_name);
367
368   std::string result = object->_salome_services->get_results_job(job);
369
370   // End
371   if (result == "")
372   {
373     if (object->_observer)
374       object->_observer->sendEvent("get_results_job", "Ok", job_name, "");
375   }
376   else
377   {
378     if (object->_observer)
379       object->_observer->sendEvent("get_results_job", "Error", job_name, result);
380   }
381   object->_thread_mutex_jobs_map.unlock();
382 }
383
384 void
385 BL::JobsManager::save_jobs(const std::string & xml_file)
386 {
387   DEBTRACE("BL::JobsManager::save_jobs called for : " << xml_file);
388
389   // Prepare Info for thread
390   BL::JobsManager::thread_info_file * ti = new thread_info_file();
391   ti->object_ptr = this;
392   ti->file_name = xml_file;
393   omni_thread::create(BL::JobsManager::save_jobs_thread, ti);
394 }
395
396 void
397 BL::JobsManager::load_jobs(const std::string & xml_file)
398 {
399   DEBTRACE("BL::JobsManager::load_jobs called for : " << xml_file);
400
401   // Prepare Info for thread
402   BL::JobsManager::thread_info_file * ti = new thread_info_file();
403   ti->object_ptr = this;
404   ti->file_name = xml_file;
405   omni_thread::create(BL::JobsManager::load_jobs_thread, ti);
406 }
407
408 void
409 BL::JobsManager::save_jobs_thread(void * object_ptr)
410 {
411   DEBTRACE("save_jobs_thread BL::JobsManager called");
412   BL::JobsManager::thread_info_file * ti = reinterpret_cast<BL::JobsManager::thread_info_file*>(object_ptr);
413   BL::JobsManager * object = ti->object_ptr;
414   std::string file_name =  ti->file_name;
415
416   object->_thread_mutex_jobs_map.lock();
417   std::string result = object->_salome_services->save_jobs(file_name);
418   object->_thread_mutex_jobs_map.unlock();
419
420   if (result != "")
421     if (object->_observer)
422       object->_observer->sendEvent("save_jobs", "Error", "", result);
423 }
424
425 void
426 BL::JobsManager::load_jobs_thread(void * object_ptr)
427 {
428   DEBTRACE("load_jobs_thread BL::JobsManager called");
429   BL::JobsManager::thread_info_file * ti = reinterpret_cast<BL::JobsManager::thread_info_file*>(object_ptr);
430   BL::JobsManager * object = ti->object_ptr;
431   std::string file_name =  ti->file_name;
432
433   object->_thread_mutex_jobs_map.lock();
434   std::string result = object->_salome_services->load_jobs(file_name);
435   object->_thread_mutex_jobs_map.unlock();
436
437   if (result != "")
438     if (object->_observer)
439       object->_observer->sendEvent("load_jobs", "Error", "", result);
440 }
441
442 void
443 BL::JobsManager::launcher_event_save_jobs(const std::string & data)
444 {
445   if (_observer)
446     _observer->sendEvent("save_jobs", "Ok", "", data);
447 }
448
449 void
450 BL::JobsManager::launcher_event_load_jobs(const std::string & data)
451 {
452   if (_observer)
453     _observer->sendEvent("load_jobs", "Ok", "", data);
454 }
455
456 void
457 BL::JobsManager::launcher_event_new_job(const std::string & data)
458 {
459   int job_number;
460   std::istringstream job_number_stream(data);
461   if (job_number_stream >> job_number)
462   {
463         DEBTRACE("Receive NEWJOB:" << job_number);
464     BL::JobsManager::thread_info_new_job * ti = new thread_info_new_job();
465     ti->object_ptr = this;
466     ti->job_number = job_number;
467     omni_thread::create(BL::JobsManager::launcher_event_new_job_thread, ti);
468   }
469 }
470
471 void
472 BL::JobsManager::launcher_event_update_job_state(const std::string & data)
473 {
474   refresh_jobs();
475 }
476
477 void
478 BL::JobsManager::launcher_event_new_job_thread(void * object_ptr)
479 {
480   DEBTRACE("Start of BL::JobsManager::launcher_event_new_job_thread");
481   BL::JobsManager::thread_info_new_job * ti = reinterpret_cast<BL::JobsManager::thread_info_new_job*>(object_ptr);
482   BL::JobsManager * object = ti->object_ptr;
483   int job_number =  ti->job_number;
484
485   object->_thread_mutex_jobs_map.lock();
486
487   // 1: Check if job is not already on our map
488   bool job_in_map = false;
489   _jobs_map::iterator jobs_it;
490   jobs_it = object->_jobs.begin();
491   for(;jobs_it != object->_jobs.end();jobs_it++)
492   {
493     BL::Job * job = jobs_it->second;
494     if (job->getSalomeLauncherId() == job_number)
495       job_in_map = true;
496   }
497
498   if (!job_in_map)
499   {
500     // 2: We try to get job informations
501
502     BL::Job * new_job = object->_salome_services->get_new_job(job_number);
503
504     // 3: We add it
505     if (new_job)
506     {
507       // 4: Check if job has a name or if the name already exists
508       if (new_job->getName() == "")
509       {
510         std::ostringstream name_stream;
511         name_stream << "no_name_" << object->_name_counter;
512         object->_name_counter++;
513         new_job->setName(name_stream.str());
514       }
515
516       _jobs_map::iterator _jobs_it_name = object->_jobs.find(new_job->getName());
517       if (_jobs_it_name != object->_jobs.end())
518       {
519         std::ostringstream name_stream;
520         name_stream << new_job->getName() << "_" << object->_name_counter;
521         object->_name_counter++;
522         new_job->setName(name_stream.str());
523       }
524       // 5: Insert job
525       object->_jobs[new_job->getName()] = new_job;
526       if (object->_observer)
527         object->_observer->sendEvent("add_job", "Ok", new_job->getName(), "");
528     }
529   }
530
531   object->_thread_mutex_jobs_map.unlock();
532 }
533
534 void
535 BL::JobsManager::launcher_event_remove_job(const std::string & data)
536 {
537   int job_number;
538   std::istringstream job_number_stream(data);
539   if (job_number_stream >> job_number)
540   {
541     BL::JobsManager::thread_info_new_job * ti = new thread_info_new_job();
542     ti->object_ptr = this;
543     ti->job_number = job_number;
544     omni_thread::create(BL::JobsManager::launcher_event_remove_job_thread, ti);
545   }
546 }
547
548 void
549 BL::JobsManager::launcher_event_remove_job_thread(void * object_ptr)
550 {
551   DEBTRACE("Start of BL::JobsManager::launcher_event_remove_job_thread");
552   BL::JobsManager::thread_info_new_job * ti = reinterpret_cast<BL::JobsManager::thread_info_new_job*>(object_ptr);
553   BL::JobsManager * object = ti->object_ptr;
554   int job_number =  ti->job_number;
555
556   object->_thread_mutex_jobs_map.lock();
557
558   _jobs_map::iterator jobs_it;
559   jobs_it = object->_jobs.begin();
560   for(;jobs_it != object->_jobs.end();jobs_it++)
561   {
562     BL::Job * job = jobs_it->second;
563     if (job->getSalomeLauncherId() == job_number)
564     {
565       job->setSalomeLauncherId(-1);
566       if (object->_observer)
567         object->_observer->sendEvent("to_remove_job", "Ok", job->getName(), "");
568     }
569   }
570
571   object->_thread_mutex_jobs_map.unlock();
572 }