Salome HOME
Rewrite job persistence to facilitate maintainance and further evolutions
[modules/kernel.git] / src / Launcher / SALOME_Launcher.cxx
1 // Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #include "SALOME_Launcher.hxx"
24 #include "BatchTest.hxx"
25 #include "OpUtil.hxx"
26 #include "SALOME_ContainerManager.hxx"
27 #include "Utils_CorbaException.hxx"
28
29
30 #include "Launcher_Job_Command.hxx"
31 #include "Launcher_Job_YACSFile.hxx"
32 #include "Launcher_Job_PythonSALOME.hxx"
33
34 #ifdef WIN32
35 # include <process.h>
36 #else
37 # include <unistd.h>
38 #endif
39 #include <sys/types.h>
40 #include <vector>
41 #include <list>
42
43 #include <stdio.h>
44 #include <sstream>
45
46 using namespace std;
47
48 const char *SALOME_Launcher::_LauncherNameInNS = "/SalomeLauncher";
49
50 //=============================================================================
51 /*! 
52  *  Constructor
53  *  \param orb
54  */
55 //=============================================================================
56 SALOME_Launcher::SALOME_Launcher(CORBA::ORB_ptr orb, PortableServer::POA_var poa) : _l()
57 {
58   MESSAGE("SALOME_Launcher constructor");
59   _NS = new SALOME_NamingService(orb);
60   _ResManager = new SALOME_ResourcesManager(orb,poa,_NS);
61   _l.SetResourcesManager(_ResManager->GetImpl());
62   _ContManager = new SALOME_ContainerManager(orb,poa,_ResManager,_NS);
63   _ResManager->_remove_ref();
64   _ContManager->_remove_ref();
65
66   _orb = CORBA::ORB::_duplicate(orb) ;
67   _poa = PortableServer::POA::_duplicate(poa) ;
68   PortableServer::ObjectId_var id = _poa->activate_object(this);
69   CORBA::Object_var obj = _poa->id_to_reference(id);
70   Engines::SalomeLauncher_var refContMan = Engines::SalomeLauncher::_narrow(obj);
71
72   _NS->Register(refContMan,_LauncherNameInNS);
73   MESSAGE("SALOME_Launcher constructor end");
74 }
75
76 //=============================================================================
77 /*! 
78  * destructor
79  */
80 //=============================================================================
81 SALOME_Launcher::~SALOME_Launcher()
82 {
83   MESSAGE("SALOME_Launcher destructor");
84   delete _NS;
85   MESSAGE("SALOME_Launcher destructor end");
86 }
87
88
89 CORBA::Long 
90 SALOME_Launcher::createJob(const Engines::JobParameters & job_parameters)
91 {
92   std::string job_type = job_parameters.job_type.in();
93
94   if (job_type != "command" && job_type != "yacs_file" && job_type != "python_salome")
95   {
96     std::string message("SALOME_Launcher::createJob: bad job type: ");
97     message += job_type;
98     THROW_SALOME_CORBA_EXCEPTION(message.c_str(), SALOME::INTERNAL_ERROR);
99   }
100
101   Launcher::Job * new_job; // It is Launcher_cpp that is going to destroy it
102
103   if (job_type == "command")
104     new_job = new Launcher::Job_Command();
105   else if (job_type == "yacs_file")
106     new_job = new Launcher::Job_YACSFile();
107   else if (job_type == "python_salome")
108     new_job = new Launcher::Job_PythonSALOME();
109
110   // Name
111   new_job->setJobName(job_parameters.job_name.in());
112
113   // Directories
114   std::string work_directory = job_parameters.work_directory.in();
115   std::string local_directory = job_parameters.local_directory.in();
116   std::string result_directory = job_parameters.result_directory.in();
117   new_job->setWorkDirectory(work_directory);
118   new_job->setLocalDirectory(local_directory);
119   new_job->setResultDirectory(result_directory);
120
121   // Parameters for COORM
122   std::string launcher_file = job_parameters.launcher_file.in();
123   std::string launcher_args = job_parameters.launcher_args.in();
124   new_job->setLauncherFile(launcher_file);
125   new_job->setLauncherArgs(launcher_args);
126
127   // Job File
128   std::string job_file = job_parameters.job_file.in();
129   try
130   {
131     new_job->setJobFile(job_file);
132   }
133   catch(const LauncherException &ex)
134   {
135     INFOS(ex.msg.c_str());
136     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
137   }
138
139   // Files
140   std::string env_file = job_parameters.env_file.in();
141   new_job->setEnvFile(env_file);
142   for (CORBA::ULong i = 0; i < job_parameters.in_files.length(); i++)
143     new_job->add_in_file(job_parameters.in_files[i].in());
144   for (CORBA::ULong i = 0; i < job_parameters.out_files.length(); i++)
145     new_job->add_out_file(job_parameters.out_files[i].in());
146
147   // Expected During Time
148   try
149   {
150     std::string maximum_duration = job_parameters.maximum_duration.in();
151     new_job->setMaximumDuration(maximum_duration);
152   }
153   catch(const LauncherException &ex){
154     INFOS(ex.msg.c_str());
155     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
156   }
157
158   // Queue
159   std::string queue = job_parameters.queue.in();
160   new_job->setQueue(queue);
161
162   // Exclusive
163   new_job->setExclusive(job_parameters.exclusive);
164
165   // Memory required per CPU
166   new_job->setMemPerCpu(job_parameters.mem_per_cpu);
167
168   // Resources requirements
169   try
170   {
171     resourceParams p;
172     p.name = job_parameters.resource_required.name;
173     p.hostname = job_parameters.resource_required.hostname;
174     p.OS = job_parameters.resource_required.OS;
175     p.nb_proc = job_parameters.resource_required.nb_proc;
176     p.nb_node = job_parameters.resource_required.nb_node;
177     p.nb_proc_per_node = job_parameters.resource_required.nb_proc_per_node;
178     p.cpu_clock = job_parameters.resource_required.cpu_clock;
179     p.mem_mb = job_parameters.resource_required.mem_mb;
180     new_job->setResourceRequiredParams(p);
181   }
182   catch(const LauncherException &ex){
183     INFOS(ex.msg.c_str());
184     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
185   }
186
187   // Adding specific parameters to the job
188   for (CORBA::ULong i = 0; i < job_parameters.specific_parameters.length(); i++)
189     new_job->addSpecificParameter(job_parameters.specific_parameters[i].name.in(),
190                                   job_parameters.specific_parameters[i].value.in());
191   try
192   {
193     new_job->checkSpecificParameters();
194   }
195   catch(const LauncherException &ex)
196   {
197     INFOS(ex.msg.c_str());
198     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
199   }
200
201   try
202   {
203     _l.createJob(new_job);
204     std::ostringstream job_id;
205     job_id << new_job->getNumber();
206     notifyObservers("NEW_JOB", job_id.str());
207   }
208   catch(const LauncherException &ex)
209   {
210     INFOS(ex.msg.c_str());
211     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
212   }
213   return new_job->getNumber();
214 }
215
216 void 
217 SALOME_Launcher::launchJob(CORBA::Long job_id)
218 {
219   try
220   {
221     _l.launchJob(job_id);
222   }
223   catch(const LauncherException &ex)
224   {
225     INFOS(ex.msg.c_str());
226     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
227   }
228 }
229
230 char *
231 SALOME_Launcher::getJobState(CORBA::Long job_id)
232 {
233   std::string result;
234   try
235   {
236     result = _l.getJobState(job_id);
237   }
238   catch(const LauncherException &ex)
239   {
240     INFOS(ex.msg.c_str());
241     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
242   }
243   return CORBA::string_dup(result.c_str());
244 }
245
246 // Get names or ids of hosts assigned to the job
247 char *
248 SALOME_Launcher::getAssignedHostnames(CORBA::Long job_id)
249 {
250   std::string result;
251   try
252   {
253     result = _l.getAssignedHostnames(job_id);
254   }
255   catch(const LauncherException &ex)
256   {
257     INFOS(ex.msg.c_str());
258     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
259   }
260   return CORBA::string_dup(result.c_str());
261 }
262
263 void
264 SALOME_Launcher::getJobResults(CORBA::Long job_id, const char * directory)
265 {
266   try
267   {
268     _l.getJobResults(job_id, directory);
269   }
270   catch(const LauncherException &ex)
271   {
272     INFOS(ex.msg.c_str());
273     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
274   }
275 }
276
277 CORBA::Boolean
278 SALOME_Launcher::getJobDumpState(CORBA::Long job_id, const char * directory)
279 {
280   CORBA::Boolean rtn = false;
281   try
282   {
283     rtn = _l.getJobDumpState(job_id, directory);
284   }
285   catch(const LauncherException &ex)
286   {
287     INFOS(ex.msg.c_str());
288     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
289   }
290   return rtn;
291 }
292
293 void 
294 SALOME_Launcher::removeJob(CORBA::Long job_id)
295 {
296   try
297   {
298     _l.removeJob(job_id);
299     std::ostringstream job_id_str;
300     job_id_str << job_id;
301     notifyObservers("REMOVE_JOB", job_id_str.str());
302   }
303   catch(const LauncherException &ex)
304   {
305     INFOS(ex.msg.c_str());
306     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
307   }
308 }
309
310 void 
311 SALOME_Launcher::stopJob(CORBA::Long job_id)
312 {
313   try
314   {
315     _l.stopJob(job_id);
316     std::ostringstream job_id_str;
317     job_id_str << job_id;
318     notifyObservers("UPDATE_JOB_STATE", job_id_str.str());
319   }
320   catch(const LauncherException &ex)
321   {
322     INFOS(ex.msg.c_str());
323     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
324   }
325 }
326
327 //=============================================================================
328 /*! CORBA Method:
329  *  Create a job in the launcher with a file
330  *  \param xmlExecuteFile     : .xml to parse that contains job description
331  *  \param clusterName        : machine choosed
332  */
333 //=============================================================================
334 CORBA::Long 
335 SALOME_Launcher::createJobWithFile(const char * xmlExecuteFile,
336                                    const char * clusterName)
337 {
338   CORBA::Long jobId;
339   try{
340     jobId = _l.createJobWithFile(xmlExecuteFile, clusterName);
341   }
342   catch(const LauncherException &ex){
343     INFOS(ex.msg.c_str());
344     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
345   }
346
347   return jobId;
348 }
349
350 //=============================================================================
351 /*! CORBA Method:
352  *  the test batch configuration 
353  *  \param params             : The batch cluster
354  */
355 //=============================================================================
356 CORBA::Boolean 
357 SALOME_Launcher::testBatch(const Engines::ResourceParameters& params)
358 {
359   MESSAGE("BEGIN OF SALOME_Launcher::testBatch");
360   CORBA::Boolean rtn = false;
361   try
362   {
363     // Consider only resources that can run batch jobs
364     Engines::ResourceParameters new_params(params);
365     new_params.can_launch_batch_jobs = true;
366
367     // find a resource matching the required parameters
368     Engines::ResourceList *aMachineList = _ResManager->GetFittingResources(new_params);
369     if (aMachineList->length() == 0)
370       throw SALOME_Exception("No resources have been found with your parameters");
371
372     const Engines::ResourceDefinition* p = _ResManager->GetResourceDefinition((*aMachineList)[0]);
373         std::string resource_name(p->name);
374     INFOS("Choose resource for test: " <<  resource_name);
375     
376     BatchTest t(*p);
377     if (t.test()) 
378     {
379       rtn = true;
380     }
381   }
382   catch(const LauncherException &ex){
383     INFOS(ex.msg.c_str());
384     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
385   }
386   return rtn;
387 }
388
389 //=============================================================================
390 /*! CORBA method:
391  *  shutdown all the containers, then the ContainerManager servant
392  */
393 //=============================================================================
394 void SALOME_Launcher::Shutdown()
395 {
396   MESSAGE("Shutdown");
397   _NS->Destroy_Name(_LauncherNameInNS);
398   _ContManager->Shutdown();
399   _ResManager->Shutdown();
400   PortableServer::ObjectId_var oid = _poa->servant_to_id(this);
401   _poa->deactivate_object(oid);
402   if(!CORBA::is_nil(_orb))
403     _orb->shutdown(0);
404 }
405
406 //=============================================================================
407 /*! CORBA Method:
408  *  Returns the PID of the process
409  */
410 //=============================================================================
411 CORBA::Long SALOME_Launcher::getPID()
412 {
413   return 
414 #ifndef WIN32
415     (CORBA::Long)getpid();
416 #else
417     (CORBA::Long)_getpid();
418 #endif
419 }
420
421 //=============================================================================
422 /*! CORBA Method:
423  *  Returns current launcher jobs list
424  */
425 //=============================================================================
426 Engines::JobsList *
427 SALOME_Launcher::getJobsList()
428 {
429   Engines::JobsList_var jobs_list = new Engines::JobsList();
430   std::map<int, Launcher::Job *> cpp_jobs = _l.getJobs();
431   std::map<int, Launcher::Job *>::const_iterator it_job;
432   int list_id = 0;
433   for(it_job = cpp_jobs.begin(); it_job != cpp_jobs.end(); it_job++)
434   {
435     int number          = it_job->first;
436     try
437     {
438       // Prepare CORBA job description
439       Engines::JobDescription_var job_descr = new Engines::JobDescription();
440       Engines::JobParameters_var job_parameters = getJobParameters(number);
441       job_descr->job_id = number;
442       job_descr->job_parameters = job_parameters;
443
444       // Add job description to the sequence
445       jobs_list->length(list_id + 1);
446       jobs_list[list_id] = job_descr;
447       list_id++;
448     }
449     catch (...) {}
450   }
451   return jobs_list._retn();
452 }
453
454 //=============================================================================
455 /*! CORBA Method:
456  * Returns the job description
457  */
458 //=============================================================================
459 Engines::JobParameters *
460 SALOME_Launcher::getJobParameters(CORBA::Long job_id)
461 {
462   std::map<int, Launcher::Job *> cpp_jobs = _l.getJobs();
463   std::map<int, Launcher::Job *>::const_iterator it_job = cpp_jobs.find(job_id);
464   if (it_job == cpp_jobs.end())
465   {
466     INFOS("Cannot find the job, is it created ? job number: " << job_id);
467     THROW_SALOME_CORBA_EXCEPTION("Job does not exist", SALOME::INTERNAL_ERROR);
468   }
469
470   Launcher::Job * job = it_job->second;
471   Engines::JobParameters_var job_parameters = new Engines::JobParameters;
472   job_parameters->job_name         = CORBA::string_dup(job->getJobName().c_str());
473   job_parameters->job_type         = CORBA::string_dup(job->getJobType().c_str());
474   job_parameters->job_file         = CORBA::string_dup(job->getJobFile().c_str());
475   job_parameters->env_file         = CORBA::string_dup(job->getEnvFile().c_str());
476   job_parameters->work_directory   = CORBA::string_dup(job->getWorkDirectory().c_str());
477   job_parameters->local_directory  = CORBA::string_dup(job->getLocalDirectory().c_str());
478   job_parameters->result_directory = CORBA::string_dup(job->getResultDirectory().c_str());
479
480   // Parameters for COORM
481   job_parameters->launcher_file = CORBA::string_dup(job->getLauncherFile().c_str());
482   job_parameters->launcher_args = CORBA::string_dup(job->getLauncherArgs().c_str());
483
484   int i = 0;
485   int j = 0;
486   std::list<std::string> in_files  = job->get_in_files();
487   std::list<std::string> out_files = job->get_out_files();
488   job_parameters->in_files.length(in_files.size());
489   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
490   {
491     job_parameters->in_files[i] = CORBA::string_dup((*it).c_str());
492     i++;
493   }
494   job_parameters->out_files.length(out_files.size());
495   for(std::list<std::string>::iterator it = out_files.begin(); it != out_files.end(); it++)
496   {
497     job_parameters->out_files[j] = CORBA::string_dup((*it).c_str());
498     j++;
499   }
500
501   job_parameters->maximum_duration = CORBA::string_dup(job->getMaximumDuration().c_str());
502   job_parameters->queue            = CORBA::string_dup(job->getQueue().c_str());
503   job_parameters->exclusive        = job->getExclusive();
504   job_parameters->mem_per_cpu      = job->getMemPerCpu();
505
506   resourceParams resource_params = job->getResourceRequiredParams();
507   job_parameters->resource_required.name             = CORBA::string_dup(resource_params.name.c_str());
508   job_parameters->resource_required.hostname         = CORBA::string_dup(resource_params.hostname.c_str());
509   job_parameters->resource_required.OS               = CORBA::string_dup(resource_params.OS.c_str());
510   job_parameters->resource_required.nb_proc          = resource_params.nb_proc;
511   job_parameters->resource_required.nb_node          = resource_params.nb_node;
512   job_parameters->resource_required.nb_proc_per_node = resource_params.nb_proc_per_node;
513   job_parameters->resource_required.cpu_clock        = resource_params.cpu_clock;
514   job_parameters->resource_required.mem_mb           = resource_params.mem_mb;
515
516   std::map<std::string, std::string> specific_parameters = job->getSpecificParameters();
517   if (!specific_parameters.empty())
518   {
519     job_parameters->specific_parameters.length(specific_parameters.size());
520     std::map<std::string, std::string>::const_iterator it_specific;
521     CORBA::ULong i = 0;
522     for (it_specific = specific_parameters.begin() ; it_specific != specific_parameters.end(); it_specific++)
523     {
524       Engines::Parameter_var new_param = new Engines::Parameter;
525       new_param->name  = CORBA::string_dup((it_specific->first).c_str());
526       new_param->value = CORBA::string_dup((it_specific->second).c_str());
527       job_parameters->specific_parameters[i] = new_param;
528       i++;
529     }
530   }
531
532   return job_parameters._retn();
533 }
534
535 //=============================================================================
536 /*! CORBA Method:
537  *  Loads jobs saved in jobs_file
538  */
539 //=============================================================================
540 void
541 SALOME_Launcher::loadJobs(const char* jobs_file)
542 {
543   list<int> new_jobs_id_list;
544   try
545   {
546     // Load the jobs in Launcher
547     new_jobs_id_list = _l.loadJobs(jobs_file);
548   }
549   catch (const LauncherException & ex)
550   {
551     INFOS(ex.msg.c_str());
552     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(), SALOME::INTERNAL_ERROR);
553   }
554
555   // Notify observers of the new jobs
556   list<int>::const_iterator it_jobs_id;
557   for (it_jobs_id = new_jobs_id_list.begin(); it_jobs_id != new_jobs_id_list.end(); it_jobs_id++)
558   {
559     ostringstream job_id_sstr;
560     job_id_sstr << *it_jobs_id;
561     notifyObservers("NEW_JOB", job_id_sstr.str());
562   }
563   notifyObservers("LOAD_JOBS", jobs_file);
564 }
565
566 //=============================================================================
567 /*! CORBA Method:
568  *  Save jobs of Launcher (in any steps) in file jobs_file
569  */
570 //=============================================================================
571 void
572 SALOME_Launcher::saveJobs(const char* jobs_file)
573 {
574   _l.saveJobs(jobs_file);
575   notifyObservers("SAVE_JOBS", jobs_file);
576 }
577
578 //=============================================================================
579 /*! CORBA Method:
580  *  Add a new observer to the launcher
581  */
582 //=============================================================================
583 void
584 SALOME_Launcher::addObserver(Engines::SalomeLauncherObserver_ptr observer)
585 {
586   bool new_observer = true;
587   std::list<Engines::SalomeLauncherObserver_var>::iterator iter = _observers.begin();
588   while(iter != _observers.end())
589   {
590     if (std::string(_orb->object_to_string(*iter)) ==
591         std::string(_orb->object_to_string(observer)))
592     {
593       new_observer = false;
594       break;
595     }
596     iter++;
597   }
598   if (new_observer)
599     _observers.push_back(Engines::SalomeLauncherObserver::_duplicate(observer));
600
601   // We notify the new observer with all jobs that are currently in the Launcher
602   std::map<int, Launcher::Job *> cpp_jobs = _l.getJobs();
603   std::map<int, Launcher::Job *>::const_iterator it_job;
604   for(it_job = cpp_jobs.begin(); it_job != cpp_jobs.end(); it_job++)
605   {
606     int number = it_job->first;
607     std::ostringstream job_id;
608     job_id << number;
609     try
610     {
611       observer->notify("NEW_JOB", job_id.str().c_str());
612     }
613     catch (...) 
614     {
615        MESSAGE("Notify Observer, exception catch");
616     }
617
618   }
619 }
620
621 //=============================================================================
622 /*! CORBA Method:
623  *  Add a new observer to the launcher
624  */
625 //=============================================================================
626 void
627 SALOME_Launcher::removeObserver(Engines::SalomeLauncherObserver_ptr observer)
628 {
629   std::list<Engines::SalomeLauncherObserver_var>::iterator iter = _observers.begin();
630   while(iter != _observers.end())
631   {
632     if (std::string(_orb->object_to_string(*iter)) ==
633         std::string(_orb->object_to_string(observer)))
634     {
635       // Observer found
636       iter =_observers.erase(iter++);
637     }
638     else
639     {
640       iter++;
641     }
642   }
643 }
644
645 //=============================================================================
646 /*! Internal Method:
647  *  Notify observers on a new event
648  */
649 //=============================================================================
650 void
651 SALOME_Launcher::notifyObservers(const std::string & event_name,
652                                  const std::string & event_data)
653 {
654   std::list<Engines::SalomeLauncherObserver_var>::iterator iter = _observers.begin();
655   while(iter != _observers.end())
656   {
657     try
658     {
659       (*iter)->notify(CORBA::string_dup(event_name.c_str()),
660                       CORBA::string_dup(event_data.c_str()));
661     }
662     catch (...) 
663     {
664        MESSAGE("Notify Observer, exception catch");
665     }
666     iter++;
667   }
668
669 }