Salome HOME
Integrate developments from N. Toukourou at INRIA (OAR and CooRM support)
[modules/kernel.git] / src / Launcher / Launcher.cxx
1 // Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #ifdef WITH_LIBBATCH
24 #include <libbatch/BatchManagerCatalog.hxx>
25 #include <libbatch/FactBatchManager.hxx>
26 #include <libbatch/BatchManager.hxx>
27 #endif
28
29 #include "Basics_Utils.hxx"
30 #include "Basics_DirUtils.hxx"
31 #include "SALOME_Launcher_Handler.hxx"
32 #include "Launcher.hxx"
33 #include "Launcher_Job_Command.hxx"
34 #include <iostream>
35 #include <sstream>
36 #include <sys/stat.h>
37 #include <time.h>
38
39 //=============================================================================
40 /*! 
41  *  Constructor
42  *  \param orb
43  *  Define a CORBA single thread policy for the server, which avoid to deal
44  *  with non thread-safe usage like Change_Directory in SALOME naming service
45  */
46 //=============================================================================
47 Launcher_cpp::Launcher_cpp()
48 {
49   LAUNCHER_MESSAGE("Launcher_cpp constructor");
50   _job_cpt = 0;
51   _job_cpt_mutex = new pthread_mutex_t();
52   pthread_mutex_init(_job_cpt_mutex, NULL);
53 }
54
55 //=============================================================================
56 /*! 
57  * destructor
58  */
59 //=============================================================================
60 Launcher_cpp::~Launcher_cpp()
61 {
62   LAUNCHER_MESSAGE("Launcher_cpp destructor");
63 #ifdef WITH_LIBBATCH
64   std::map<int, Launcher::Job *>::const_iterator it_job;
65   for(it_job = _launcher_job_map.begin(); it_job != _launcher_job_map.end(); it_job++)
66     delete it_job->second;
67   std::map <int, Batch::BatchManager * >::const_iterator it1;
68   for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
69     delete it1->second;
70 #endif
71
72   pthread_mutex_destroy(_job_cpt_mutex);
73   delete _job_cpt_mutex;
74 }
75
76 #ifdef WITH_LIBBATCH
77
78 //=============================================================================
79 /*!
80  * Add a job into the launcher - check resource and choose one 
81  */ 
82 //=============================================================================
83 void 
84 Launcher_cpp::createJob(Launcher::Job * new_job)
85 {
86   LAUNCHER_MESSAGE("Creating a new job");
87   // Add job to the jobs map
88   pthread_mutex_lock(_job_cpt_mutex);
89   new_job->setNumber(_job_cpt);
90   _job_cpt++;
91   pthread_mutex_unlock(_job_cpt_mutex);
92   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
93   if (it_job == _launcher_job_map.end())
94     _launcher_job_map[new_job->getNumber()] = new_job;
95   else
96   {
97     LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
98     delete new_job;
99     throw LauncherException("A job as already the same id - job is not created !");
100   }
101   LAUNCHER_MESSAGE("New Job created");
102 }
103
104 //=============================================================================
105 /*!
106  * Launch a job 
107  */ 
108 //=============================================================================
109 void 
110 Launcher_cpp::launchJob(int job_id)
111 {
112   LAUNCHER_MESSAGE("Launch a job");
113
114   // Check if job exist
115   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
116   if (it_job == _launcher_job_map.end())
117   {
118     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
119     throw LauncherException("Cannot find the job, is it created ?");
120   }
121
122   Launcher::Job * job = it_job->second;
123
124   // Check job state (cannot launch a job already launched...)
125   if (job->getState() != "CREATED")
126   {
127     LAUNCHER_INFOS("Bad state of the job: " << job->getState());
128     throw LauncherException("Bad state of the job: " + job->getState());
129   }
130
131   // Third step search batch manager for the job into the map -> instanciate one if does not exist
132 #ifdef WITH_LIBBATCH
133   std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
134   if(it == _batchmap.end())
135   {
136     createBatchManagerForJob(job);
137   }
138 #endif
139
140   try {
141     Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob()));
142     job->setBatchManagerJobId(batch_manager_job_id);
143     job->setState("QUEUED");
144   }
145   catch(const Batch::GenericException &ex)
146   {
147     LAUNCHER_INFOS("Job is not launched, exception in submitJob: " << ex.message);
148     throw LauncherException(ex.message.c_str());
149   }
150   LAUNCHER_MESSAGE("Job launched");
151 }
152
153 //=============================================================================
154 /*!
155  * Get job state
156  */ 
157 //=============================================================================
158 const char *
159 Launcher_cpp::getJobState(int job_id)
160 {
161   LAUNCHER_MESSAGE("Get job state");
162
163   // Check if job exist
164   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
165   if (it_job == _launcher_job_map.end())
166   {
167     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
168     throw LauncherException("Cannot find the job, is it created ?");
169   }
170
171   Launcher::Job * job = it_job->second;
172
173   std::string state;
174   try
175   {
176     state = job->updateJobState();
177   }
178   catch(const Batch::GenericException &ex)
179   {
180     LAUNCHER_INFOS("getJobState failed, exception: " << ex.message);
181     throw LauncherException(ex.message.c_str());
182   }
183
184   return state.c_str();
185 }
186
187 //=============================================================================
188 /*!
189  * Get job assigned hostnames
190  */
191 //=============================================================================
192 const char *
193 Launcher_cpp::getAssignedHostnames(int job_id)
194 {
195   LAUNCHER_MESSAGE("Get job assigned hostnames");
196
197   // Check if job exist
198   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
199   if (it_job == _launcher_job_map.end())
200   {
201     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
202     throw LauncherException("Cannot find the job, is it created ?");
203   }
204
205   Launcher::Job * job = it_job->second;
206   std::string assigned_hostnames = job->getAssignedHostnames();
207
208   return assigned_hostnames.c_str();
209 }
210
211 //=============================================================================
212 /*!
213  * Get Job result - the result directory could be changed
214  */ 
215 //=============================================================================
216 void
217 Launcher_cpp::getJobResults(int job_id, std::string directory)
218 {
219   LAUNCHER_MESSAGE("Get Job results");
220
221   // Check if job exist
222   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
223   if (it_job == _launcher_job_map.end())
224   {
225     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
226     throw LauncherException("Cannot find the job, is it created ?");
227   }
228
229   Launcher::Job * job = it_job->second;
230   std::string resource_name = job->getResourceDefinition().Name;
231   try 
232   {
233     if (directory != "")
234       _batchmap[job_id]->importOutputFiles(*(job->getBatchJob()), directory);
235     else
236       _batchmap[job_id]->importOutputFiles(*(job->getBatchJob()), job->getResultDirectory());
237   }
238   catch(const Batch::GenericException &ex)
239   {
240     LAUNCHER_INFOS("getJobResult is maybe incomplete, exception: " << ex.message);
241     throw LauncherException(ex.message.c_str());
242   }
243   LAUNCHER_MESSAGE("getJobResult ended");
244 }
245
246 //=============================================================================
247 /*!
248  * Get Job dump state - the result directory could be changed
249  */ 
250 //=============================================================================
251 bool
252 Launcher_cpp::getJobDumpState(int job_id, std::string directory)
253 {
254   bool rtn;
255   LAUNCHER_MESSAGE("Get Job dump state");
256
257   // Check if job exist
258   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
259   if (it_job == _launcher_job_map.end())
260   {
261     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
262     throw LauncherException("Cannot find the job, is it created ?");
263   }
264
265   Launcher::Job * job = it_job->second;
266   std::string resource_name = job->getResourceDefinition().Name;
267   try 
268   {
269     if (directory != "")
270       rtn = _batchmap[job_id]->importDumpStateFile(*(job->getBatchJob()), directory);
271     else
272       rtn = _batchmap[job_id]->importDumpStateFile(*(job->getBatchJob()), job->getResultDirectory());
273   }
274   catch(const Batch::GenericException &ex)
275   {
276     LAUNCHER_INFOS("getJobResult is maybe incomplete, exception: " << ex.message);
277     throw LauncherException(ex.message.c_str());
278   }
279   LAUNCHER_MESSAGE("getJobResult ended");
280   return rtn;
281 }
282
283 //=============================================================================
284 /*!
285  * Remove the job - into the Launcher and its batch manager
286  */ 
287 //=============================================================================
288 void
289 Launcher_cpp::removeJob(int job_id)
290 {
291   LAUNCHER_MESSAGE("Remove Job");
292
293   // Check if job exist
294   std::map<int, Launcher::Job *>::iterator it_job = _launcher_job_map.find(job_id);
295   if (it_job == _launcher_job_map.end())
296   {
297     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
298     throw LauncherException("Cannot find the job, is it created ?");
299   }
300
301   it_job->second->removeJob();
302   delete it_job->second;
303   _launcher_job_map.erase(it_job);
304 }
305
306 //=============================================================================
307 /*!
308  * stop the job
309  */ 
310 //=============================================================================
311 void
312 Launcher_cpp::stopJob(int job_id)
313 {
314   LAUNCHER_MESSAGE("Stop Job");
315
316   // Check if job exist
317   std::map<int, Launcher::Job *>::iterator it_job = _launcher_job_map.find(job_id);
318   if (it_job == _launcher_job_map.end())
319   {
320     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
321     throw LauncherException("Cannot find the job, is it created ?");
322   }
323
324   it_job->second->stopJob();
325 }
326
327 //=============================================================================
328 /*! 
329  *  create a launcher job based on a file
330  *  \param xmlExecuteFile     : to define the execution on the batch cluster
331  */
332 //=============================================================================
333 long 
334 Launcher_cpp::createJobWithFile(const std::string xmlExecuteFile, 
335                                 const std::string clusterName)
336 {
337   LAUNCHER_MESSAGE("Begin of Launcher_cpp::createJobWithFile");
338
339   // Parsing xml file
340   ParserLauncherType job_params = ParseXmlFile(xmlExecuteFile);
341
342   // Creating a new job
343   Launcher::Job_Command * new_job = new Launcher::Job_Command();
344
345   std::string cmdFile = Kernel_Utils::GetTmpFileName();  
346 #ifndef WIN32
347   cmdFile += ".sh";
348 #else
349   cmdFile += ".bat";
350 #endif
351   std::ofstream os;
352   os.open(cmdFile.c_str(), std::ofstream::out );
353   os << "#! /bin/sh" << std::endl;
354   os << job_params.Command;
355   os.close();
356
357   new_job->setJobFile(cmdFile);
358   new_job->setLocalDirectory(job_params.RefDirectory);
359   new_job->setWorkDirectory(job_params.MachinesList[clusterName].WorkDirectory);
360   new_job->setEnvFile(job_params.MachinesList[clusterName].EnvFile);
361
362   for(int i=0; i < job_params.InputFile.size(); i++)
363     new_job->add_in_file(job_params.InputFile[i]);
364   for(int i=0; i < job_params.OutputFile.size();i++)
365     new_job->add_out_file(job_params.OutputFile[i]);
366
367   resourceParams p;
368   p.hostname = clusterName;
369   p.name = "";
370   p.OS = "";
371   p.nb_proc = job_params.NbOfProcesses;
372   p.nb_node = 0;
373   p.nb_proc_per_node = 0;
374   p.cpu_clock = 0;
375   p.mem_mb = 0;
376   new_job->setResourceRequiredParams(p);
377
378   createJob(new_job);
379   return new_job->getNumber();
380 }
381
382 //=============================================================================
383 /*!
384  *  Factory to instanciate the good batch manager for choosen cluster.
385  */ 
386 //=============================================================================
387 Batch::BatchManager *
388 Launcher_cpp::FactoryBatchManager(ParserResourcesType& params)
389 {
390   std::string mpi;
391   Batch::CommunicationProtocolType protocol;
392   Batch::FactBatchManager * fact;
393
394   std::string hostname = params.HostName;
395
396   switch(params.Protocol)
397   {
398     case sh:
399       protocol = Batch::SH;
400       break;
401     case rsh:
402       protocol = Batch::RSH;
403       break;
404     case ssh:
405       protocol = Batch::SSH;
406       break;
407     default:
408       throw LauncherException("Unknown protocol for this resource");
409       break;
410   }
411
412   switch(params.mpi)
413   {
414     case lam:
415       mpi = "lam";
416       break;
417     case mpich1:
418       mpi = "mpich1";
419       break;
420     case mpich2:
421       mpi = "mpich2";
422       break;
423     case openmpi:
424       mpi = "openmpi";
425       break;
426     case ompi:
427       mpi = "ompi";
428       break;
429     case slurmmpi:
430       mpi = "slurmmpi";
431       break;
432     case prun:
433       mpi = "prun";
434       break;
435     default:
436       mpi = "nompi";
437   }
438
439   const char * bmType;
440   switch( params.Batch )
441   {
442     case pbs:
443       bmType = "PBS";
444       break;
445     case lsf:
446       bmType = "LSF";
447       break;
448     case sge:
449       bmType = "SGE";
450       break;
451     case ccc:
452       bmType = "CCC";
453       break;
454     case slurm:
455       bmType = "SLURM";
456       break;
457     case ssh_batch:
458       bmType = "LOCAL";
459       break;
460     case ll:
461       bmType = "LL";
462       break;
463     case vishnu:
464       bmType = "VISHNU";
465       break;
466     case oar:
467       bmType = "OAR";
468       break;
469     case coorm:
470       bmType = "COORM";
471       break;
472     default:
473       LAUNCHER_MESSAGE("Bad batch description of the resource: Batch = " << params.Batch);
474       throw LauncherException("No batchmanager for that cluster - Bad batch description of the resource");
475   }
476   Batch::BatchManagerCatalog & cata = Batch::BatchManagerCatalog::getInstance();
477   fact = dynamic_cast<Batch::FactBatchManager*>(cata(bmType));
478   if (fact == NULL) {
479     LAUNCHER_MESSAGE("Cannot find batch manager factory for " << bmType << ". Check your version of libBatch.");
480     throw LauncherException("Cannot find batch manager factory");
481   }
482   LAUNCHER_MESSAGE("Instanciation of batch manager of type: " << bmType);
483   Batch::BatchManager * batch_client = (*fact)(hostname.c_str(), params.UserName.c_str(),
484                                                protocol, mpi.c_str());
485   return batch_client;
486 }
487
488 //----------------------------------------------------------
489 // Without LIBBATCH - Launcher_cpp do nothing...
490 //----------------------------------------------------------
491 #else
492
493 void 
494 Launcher_cpp::createJob(Launcher::Job * new_job)
495 {
496   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot create a job !!!");
497   delete new_job;
498   throw LauncherException("Method Launcher_cpp::createJob is not available "
499                           "(libBatch was not present at compilation time)");
500 }
501
502 void 
503 Launcher_cpp::launchJob(int job_id)
504 {
505   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot launch a job !!!");
506   throw LauncherException("Method Launcher_cpp::launchJob is not available "
507                           "(libBatch was not present at compilation time)");
508 }
509
510 const char *
511 Launcher_cpp::getJobState(int job_id)
512 {
513   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job state!!!");
514   throw LauncherException("Method Launcher_cpp::getJobState is not available "
515                           "(libBatch was not present at compilation time)");
516 }
517
518 void
519 Launcher_cpp::getJobResults(int job_id, std::string directory)
520 {
521   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job results!!!");
522   throw LauncherException("Method Launcher_cpp::getJobResults is not available "
523                           "(libBatch was not present at compilation time)");
524 }
525
526 bool
527 Launcher_cpp::getJobDumpState(int job_id, std::string directory)
528 {
529   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job dump state!!!");
530   throw LauncherException("Method Launcher_cpp::getJobDumpState is not available "
531                           "(libBatch was not present at compilation time)");
532 }
533
534 void
535 Launcher_cpp::removeJob(int job_id)
536 {
537   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot remove job!!!");
538   throw LauncherException("Method Launcher_cpp::removeJob is not available "
539                           "(libBatch was not present at compilation time)");
540 }
541
542 void
543 Launcher_cpp::stopJob(int job_id)
544 {
545   throw LauncherException("Method Launcher_cpp::stopJob is not available "
546                           "(libBatch was not present at compilation time)");
547 }
548
549 long 
550 Launcher_cpp::createJobWithFile( const std::string xmlExecuteFile, std::string clusterName)
551 {
552   throw LauncherException("Method Launcher_cpp::createJobWithFile is not available "
553                           "(libBatch was not present at compilation time)");
554   return 0;
555 }
556
557 #endif
558
559 ParserLauncherType 
560 Launcher_cpp::ParseXmlFile(std::string xmlExecuteFile)
561 {
562   ParserLauncherType job_params;
563   SALOME_Launcher_Handler * handler = new SALOME_Launcher_Handler(job_params);
564
565   const char* aFilePath = xmlExecuteFile.c_str();
566   FILE* aFile = fopen(aFilePath, "r");
567   if (aFile != NULL)
568   {
569     xmlDocPtr aDoc = xmlReadFile(aFilePath, NULL, 0);
570     if (aDoc != NULL)
571       handler->ProcessXmlDocument(aDoc);
572     else
573     {
574       std::string message = "ResourcesManager_cpp: could not parse file: " + xmlExecuteFile;
575       LAUNCHER_MESSAGE(message);
576       delete handler;
577       throw LauncherException(message);
578     }
579     // Free the document
580     xmlFreeDoc(aDoc);
581     fclose(aFile);
582   }
583   else
584   {
585     std::string message = "ResourcesManager_cpp: file is not readable: " + xmlExecuteFile;
586     LAUNCHER_MESSAGE(message);
587     delete handler;
588     throw LauncherException(message);
589   }
590
591   // Return
592   delete handler;
593   return job_params;
594 }
595
596 std::map<int, Launcher::Job *>
597 Launcher_cpp::getJobs()
598 {
599   return _launcher_job_map;
600 }
601
602 void 
603 Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
604 {
605   int job_id = job->getNumber();
606
607   // Select a ressource for the job
608   std::vector<std::string> ResourceList;
609   resourceParams params = job->getResourceRequiredParams();
610   // Consider only resources that can launch batch jobs
611   params.can_launch_batch_jobs = true;
612   try
613   {
614     ResourceList = _ResManager->GetFittingResources(params);
615   }
616   catch(const ResourcesException &ex)
617   {
618     throw LauncherException(ex.msg.c_str());
619   }
620   if (ResourceList.size() == 0)
621   {
622     LAUNCHER_INFOS("No adequate resource found for the job, number " << job->getNumber());
623     job->setState("ERROR");
624     throw LauncherException("No resource found the job");
625   }
626
627   // Configure the job with the resource selected - the first of the list
628   ParserResourcesType resource_definition = _ResManager->GetResourcesDescr(ResourceList[0]);
629
630   // Set resource definition to the job
631   // The job will check if the definitions needed
632   try
633   {
634     job->setResourceDefinition(resource_definition);
635   }
636   catch(const LauncherException &ex)
637   {
638     LAUNCHER_INFOS("Error in the definition of the resource, mess: " << ex.msg);
639     job->setState("ERROR");
640     throw ex;
641   }
642
643   // Step 2: We can now add a Factory if the resource is correctly define
644 #ifdef WITH_LIBBATCH
645   std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
646   if(it == _batchmap.end())
647   {
648     try
649     {
650       // Warning cannot write on one line like this, because map object is constructed before
651       // the method is called...
652       //_batchmap[job_id] = FactoryBatchManager(resource_definition);
653       Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition);
654       _batchmap[job_id] = batch_client;
655     }
656     catch(const LauncherException &ex)
657     {
658       LAUNCHER_INFOS("Error during creation of the batch manager of the job, mess: " << ex.msg);
659       throw ex;
660     }
661     catch(const Batch::GenericException &ex)
662     {
663       LAUNCHER_INFOS("Error during creation of the batch manager of the job, mess: " << ex.message);
664       throw LauncherException(ex.message);
665     }
666   }
667 #endif
668 }
669
670 void 
671 Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job, const std::string job_reference)
672 {
673   // Step 0: Calculated job_id
674   pthread_mutex_lock(_job_cpt_mutex);
675   int job_id = _job_cpt;
676   _job_cpt++;
677   new_job->setNumber(job_id);
678   pthread_mutex_unlock(_job_cpt_mutex);
679
680   // Step 1: check if resource is already in the map
681   createBatchManagerForJob(new_job);
682
683   // Step 2: add the job to the batch manager
684 #ifdef WITH_LIBBATCH
685   try
686   {
687     Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()),
688                                                                   job_reference);
689     new_job->setBatchManagerJobId(batch_manager_job_id);
690   }
691   catch(const Batch::GenericException &ex)
692   {
693     LAUNCHER_INFOS("Job cannot be added, exception in addJob: " << ex.message);
694     throw LauncherException(ex.message.c_str());
695   }
696
697   // Step 3: add job to launcher map
698   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
699   if (it_job == _launcher_job_map.end())
700     _launcher_job_map[new_job->getNumber()] = new_job;
701   else
702   {
703     LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
704     delete new_job;
705     throw LauncherException("A job as already the same id - job is not created !");
706   }
707   LAUNCHER_MESSAGE("New job added");
708 #endif
709 }