Salome HOME
Merge from BR_LIBBATCH_2_0
[modules/yacs.git] / src / Launcher / Launcher.cxx
1 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #ifdef WITH_LIBBATCH
24 #include <libbatch/BatchManagerCatalog.hxx>
25 #include <libbatch/FactBatchManager.hxx>
26 #include <libbatch/BatchManager.hxx>
27 #endif
28
29 #include "Basics_Utils.hxx"
30 #include "Basics_DirUtils.hxx"
31 #include "SALOME_Launcher_Handler.hxx"
32 #include "Launcher.hxx"
33 #include "Launcher_Job_Command.hxx"
34 #include <iostream>
35 #include <sstream>
36 #include <sys/stat.h>
37 #include <time.h>
38
39 //=============================================================================
40 /*! 
41  *  Constructor
42  *  \param orb
43  *  Define a CORBA single thread policy for the server, which avoid to deal
44  *  with non thread-safe usage like Change_Directory in SALOME naming service
45  */
46 //=============================================================================
47 Launcher_cpp::Launcher_cpp()
48 {
49   LAUNCHER_MESSAGE("Launcher_cpp constructor");
50   _job_cpt = 0;
51   _job_cpt_mutex = new pthread_mutex_t();
52   pthread_mutex_init(_job_cpt_mutex, NULL);
53 }
54
55 //=============================================================================
56 /*! 
57  * destructor
58  */
59 //=============================================================================
60 Launcher_cpp::~Launcher_cpp()
61 {
62   LAUNCHER_MESSAGE("Launcher_cpp destructor");
63 #ifdef WITH_LIBBATCH
64   std::map<int, Launcher::Job *>::const_iterator it_job;
65   for(it_job = _launcher_job_map.begin(); it_job != _launcher_job_map.end(); it_job++)
66     delete it_job->second;
67   std::map <int, Batch::BatchManager * >::const_iterator it1;
68   for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
69     delete it1->second;
70 #endif
71
72   pthread_mutex_destroy(_job_cpt_mutex);
73   delete _job_cpt_mutex;
74 }
75
76 #ifdef WITH_LIBBATCH
77
78 //=============================================================================
79 /*!
80  * Add a job into the launcher - check resource and choose one 
81  */ 
82 //=============================================================================
83 void 
84 Launcher_cpp::createJob(Launcher::Job * new_job)
85 {
86   LAUNCHER_MESSAGE("Creating a new job");
87   // Add job to the jobs map
88   pthread_mutex_lock(_job_cpt_mutex);
89   new_job->setNumber(_job_cpt);
90   _job_cpt++;
91   pthread_mutex_unlock(_job_cpt_mutex);
92   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
93   if (it_job == _launcher_job_map.end())
94     _launcher_job_map[new_job->getNumber()] = new_job;
95   else
96   {
97     LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
98     delete new_job;
99     throw LauncherException("A job as already the same id - job is not created !");
100   }
101   LAUNCHER_MESSAGE("New Job created");
102 }
103
104 //=============================================================================
105 /*!
106  * Launch a job 
107  */ 
108 //=============================================================================
109 void 
110 Launcher_cpp::launchJob(int job_id)
111 {
112   LAUNCHER_MESSAGE("Launch a job");
113
114   // Check if job exist
115   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
116   if (it_job == _launcher_job_map.end())
117   {
118     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
119     throw LauncherException("Cannot find the job, is it created ?");
120   }
121
122   Launcher::Job * job = it_job->second;
123
124   // Check job state (cannot launch a job already launched...)
125   if (job->getState() != "CREATED")
126   {
127     LAUNCHER_INFOS("Bad state of the job: " << job->getState());
128     throw LauncherException("Bad state of the job: " + job->getState());
129   }
130
131   // Third step search batch manager for the job into the map -> instanciate one if does not exist
132 #ifdef WITH_LIBBATCH
133   std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
134   if(it == _batchmap.end())
135   {
136     createBatchManagerForJob(job);
137   }
138 #endif
139
140   try {
141     Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob()));
142     job->setBatchManagerJobId(batch_manager_job_id);
143     job->setState("QUEUED");
144   }
145   catch(const Batch::GenericException &ex)
146   {
147     LAUNCHER_INFOS("Job is not launched, exception in submitJob: " << ex.message);
148     throw LauncherException(ex.message.c_str());
149   }
150   LAUNCHER_MESSAGE("Job launched");
151 }
152
153 //=============================================================================
154 /*!
155  * Get job state
156  */ 
157 //=============================================================================
158 const char *
159 Launcher_cpp::getJobState(int job_id)
160 {
161   LAUNCHER_MESSAGE("Get job state");
162
163   // Check if job exist
164   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
165   if (it_job == _launcher_job_map.end())
166   {
167     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
168     throw LauncherException("Cannot find the job, is it created ?");
169   }
170
171   Launcher::Job * job = it_job->second;
172
173   std::string state;
174   try
175   {
176     state = job->updateJobState();
177   }
178   catch(const Batch::GenericException &ex)
179   {
180     LAUNCHER_INFOS("getJobState failed, exception: " << ex.message);
181     throw LauncherException(ex.message.c_str());
182   }
183
184   return state.c_str();
185 }
186
187 //=============================================================================
188 /*!
189  * Get Job result - the result directory could be changed
190  */ 
191 //=============================================================================
192 void
193 Launcher_cpp::getJobResults(int job_id, std::string directory)
194 {
195   LAUNCHER_MESSAGE("Get Job results");
196
197   // Check if job exist
198   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
199   if (it_job == _launcher_job_map.end())
200   {
201     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
202     throw LauncherException("Cannot find the job, is it created ?");
203   }
204
205   Launcher::Job * job = it_job->second;
206   std::string resource_name = job->getResourceDefinition().Name;
207   try 
208   {
209     if (directory != "")
210       _batchmap[job_id]->importOutputFiles(*(job->getBatchJob()), directory);
211     else
212       _batchmap[job_id]->importOutputFiles(*(job->getBatchJob()), job->getResultDirectory());
213   }
214   catch(const Batch::GenericException &ex)
215   {
216     LAUNCHER_INFOS("getJobResult is maybe incomplete, exception: " << ex.message);
217     throw LauncherException(ex.message.c_str());
218   }
219   LAUNCHER_MESSAGE("getJobResult ended");
220 }
221
222 //=============================================================================
223 /*!
224  * Get Job dump state - the result directory could be changed
225  */ 
226 //=============================================================================
227 bool
228 Launcher_cpp::getJobDumpState(int job_id, std::string directory)
229 {
230   bool rtn;
231   LAUNCHER_MESSAGE("Get Job dump state");
232
233   // Check if job exist
234   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
235   if (it_job == _launcher_job_map.end())
236   {
237     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
238     throw LauncherException("Cannot find the job, is it created ?");
239   }
240
241   Launcher::Job * job = it_job->second;
242   std::string resource_name = job->getResourceDefinition().Name;
243   try 
244   {
245     if (directory != "")
246       rtn = _batchmap[job_id]->importDumpStateFile(*(job->getBatchJob()), directory);
247     else
248       rtn = _batchmap[job_id]->importDumpStateFile(*(job->getBatchJob()), job->getResultDirectory());
249   }
250   catch(const Batch::GenericException &ex)
251   {
252     LAUNCHER_INFOS("getJobResult is maybe incomplete, exception: " << ex.message);
253     throw LauncherException(ex.message.c_str());
254   }
255   LAUNCHER_MESSAGE("getJobResult ended");
256   return rtn;
257 }
258
259 //=============================================================================
260 /*!
261  * Remove the job - into the Launcher and its batch manager
262  */ 
263 //=============================================================================
264 void
265 Launcher_cpp::removeJob(int job_id)
266 {
267   LAUNCHER_MESSAGE("Remove Job");
268
269   // Check if job exist
270   std::map<int, Launcher::Job *>::iterator it_job = _launcher_job_map.find(job_id);
271   if (it_job == _launcher_job_map.end())
272   {
273     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
274     throw LauncherException("Cannot find the job, is it created ?");
275   }
276
277   it_job->second->removeJob();
278   delete it_job->second;
279   _launcher_job_map.erase(it_job);
280 }
281
282 //=============================================================================
283 /*!
284  * stop the job
285  */ 
286 //=============================================================================
287 void
288 Launcher_cpp::stopJob(int job_id)
289 {
290   LAUNCHER_MESSAGE("Stop Job");
291
292   // Check if job exist
293   std::map<int, Launcher::Job *>::iterator it_job = _launcher_job_map.find(job_id);
294   if (it_job == _launcher_job_map.end())
295   {
296     LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
297     throw LauncherException("Cannot find the job, is it created ?");
298   }
299
300   it_job->second->stopJob();
301 }
302
303 //=============================================================================
304 /*! 
305  *  create a launcher job based on a file
306  *  \param xmlExecuteFile     : to define the execution on the batch cluster
307  */
308 //=============================================================================
309 long 
310 Launcher_cpp::createJobWithFile(const std::string xmlExecuteFile, 
311                                 const std::string clusterName)
312 {
313   LAUNCHER_MESSAGE("Begin of Launcher_cpp::createJobWithFile");
314
315   // Parsing xml file
316   ParserLauncherType job_params = ParseXmlFile(xmlExecuteFile);
317
318   // Creating a new job
319   Launcher::Job_Command * new_job = new Launcher::Job_Command();
320
321   std::string cmdFile = Kernel_Utils::GetTmpFileName();  
322 #ifndef WIN32
323   cmdFile += ".sh";
324 #else
325   cmdFile += ".bat";
326 #endif
327   std::ofstream os;
328   os.open(cmdFile.c_str(), std::ofstream::out );
329   os << "#! /bin/sh" << std::endl;
330   os << job_params.Command;
331   os.close();
332
333   new_job->setJobFile(cmdFile);
334   new_job->setLocalDirectory(job_params.RefDirectory);
335   new_job->setWorkDirectory(job_params.MachinesList[clusterName].WorkDirectory);
336   new_job->setEnvFile(job_params.MachinesList[clusterName].EnvFile);
337
338   for(int i=0; i < job_params.InputFile.size(); i++)
339     new_job->add_in_file(job_params.InputFile[i]);
340   for(int i=0; i < job_params.OutputFile.size();i++)
341     new_job->add_out_file(job_params.OutputFile[i]);
342
343   resourceParams p;
344   p.hostname = clusterName;
345   p.name = "";
346   p.OS = "";
347   p.nb_proc = job_params.NbOfProcesses;
348   p.nb_node = 0;
349   p.nb_proc_per_node = 0;
350   p.cpu_clock = 0;
351   p.mem_mb = 0;
352   new_job->setResourceRequiredParams(p);
353
354   createJob(new_job);
355   return new_job->getNumber();
356 }
357
358 //=============================================================================
359 /*!
360  *  Factory to instanciate the good batch manager for choosen cluster.
361  */ 
362 //=============================================================================
363 Batch::BatchManager *
364 Launcher_cpp::FactoryBatchManager(ParserResourcesType& params)
365 {
366   std::string mpi;
367   Batch::CommunicationProtocolType protocol;
368   Batch::FactBatchManager * fact;
369
370   std::string hostname = params.HostName;
371
372   switch(params.Protocol)
373   {
374     case sh:
375       protocol = Batch::SH;
376       break;
377     case rsh:
378       protocol = Batch::RSH;
379       break;
380     case ssh:
381       protocol = Batch::SSH;
382       break;
383     default:
384       throw LauncherException("Unknown protocol for this resource");
385       break;
386   }
387
388   switch(params.mpi)
389   {
390     case lam:
391       mpi = "lam";
392       break;
393     case mpich1:
394       mpi = "mpich1";
395       break;
396     case mpich2:
397       mpi = "mpich2";
398       break;
399     case openmpi:
400       mpi = "openmpi";
401       break;
402     case ompi:
403       mpi = "ompi";
404       break;
405     case slurmmpi:
406       mpi = "slurmmpi";
407       break;
408     case prun:
409       mpi = "prun";
410       break;
411     default:
412       mpi = "nompi";
413   }
414
415   const char * bmType;
416   switch( params.Batch )
417   {
418     case pbs:
419       bmType = "PBS";
420       break;
421     case lsf:
422       bmType = "LSF";
423       break;
424     case sge:
425       bmType = "SGE";
426       break;
427     case ccc:
428       bmType = "CCC";
429       break;
430     case slurm:
431       bmType = "SLURM";
432       break;
433     case ssh_batch:
434       bmType = "LOCAL";
435       break;
436     case ll:
437       bmType = "LL";
438       break;
439     case vishnu:
440       bmType = "VISHNU";
441       break;
442     default:
443       LAUNCHER_MESSAGE("Bad batch description of the resource: Batch = " << params.Batch);
444       throw LauncherException("No batchmanager for that cluster - Bad batch description of the resource");
445   }
446   Batch::BatchManagerCatalog & cata = Batch::BatchManagerCatalog::getInstance();
447   fact = dynamic_cast<Batch::FactBatchManager*>(cata(bmType));
448   if (fact == NULL) {
449     LAUNCHER_MESSAGE("Cannot find batch manager factory for " << bmType << ". Check your version of libBatch.");
450     throw LauncherException("Cannot find batch manager factory");
451   }
452   LAUNCHER_MESSAGE("Instanciation of batch manager of type: " << bmType);
453   Batch::BatchManager * batch_client = (*fact)(hostname.c_str(), params.UserName.c_str(),
454                                                protocol, mpi.c_str());
455   return batch_client;
456 }
457
458 //----------------------------------------------------------
459 // Without LIBBATCH - Launcher_cpp do nothing...
460 //----------------------------------------------------------
461 #else
462
463 void 
464 Launcher_cpp::createJob(Launcher::Job * new_job)
465 {
466   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot create a job !!!");
467   delete new_job;
468   throw LauncherException("Method Launcher_cpp::createJob is not available "
469                           "(libBatch was not present at compilation time)");
470 }
471
472 void 
473 Launcher_cpp::launchJob(int job_id)
474 {
475   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot launch a job !!!");
476   throw LauncherException("Method Launcher_cpp::launchJob is not available "
477                           "(libBatch was not present at compilation time)");
478 }
479
480 const char *
481 Launcher_cpp::getJobState(int job_id)
482 {
483   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job state!!!");
484   throw LauncherException("Method Launcher_cpp::getJobState is not available "
485                           "(libBatch was not present at compilation time)");
486 }
487
488 void
489 Launcher_cpp::getJobResults(int job_id, std::string directory)
490 {
491   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job results!!!");
492   throw LauncherException("Method Launcher_cpp::getJobResults is not available "
493                           "(libBatch was not present at compilation time)");
494 }
495
496 bool
497 Launcher_cpp::getJobDumpState(int job_id, std::string directory)
498 {
499   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job dump state!!!");
500   throw LauncherException("Method Launcher_cpp::getJobDumpState is not available "
501                           "(libBatch was not present at compilation time)");
502 }
503
504 void
505 Launcher_cpp::removeJob(int job_id)
506 {
507   LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot remove job!!!");
508   throw LauncherException("Method Launcher_cpp::removeJob is not available "
509                           "(libBatch was not present at compilation time)");
510 }
511
512 void
513 Launcher_cpp::stopJob(int job_id)
514 {
515   throw LauncherException("Method Launcher_cpp::stopJob is not available "
516                           "(libBatch was not present at compilation time)");
517 }
518
519 long 
520 Launcher_cpp::createJobWithFile( const std::string xmlExecuteFile, std::string clusterName)
521 {
522   throw LauncherException("Method Launcher_cpp::createJobWithFile is not available "
523                           "(libBatch was not present at compilation time)");
524   return 0;
525 }
526
527 #endif
528
529 ParserLauncherType 
530 Launcher_cpp::ParseXmlFile(std::string xmlExecuteFile)
531 {
532   ParserLauncherType job_params;
533   SALOME_Launcher_Handler * handler = new SALOME_Launcher_Handler(job_params);
534
535   const char* aFilePath = xmlExecuteFile.c_str();
536   FILE* aFile = fopen(aFilePath, "r");
537   if (aFile != NULL)
538   {
539     xmlDocPtr aDoc = xmlReadFile(aFilePath, NULL, 0);
540     if (aDoc != NULL)
541       handler->ProcessXmlDocument(aDoc);
542     else
543     {
544       std::string message = "ResourcesManager_cpp: could not parse file: " + xmlExecuteFile;
545       LAUNCHER_MESSAGE(message);
546       delete handler;
547       throw LauncherException(message);
548     }
549     // Free the document
550     xmlFreeDoc(aDoc);
551     fclose(aFile);
552   }
553   else
554   {
555     std::string message = "ResourcesManager_cpp: file is not readable: " + xmlExecuteFile;
556     LAUNCHER_MESSAGE(message);
557     delete handler;
558     throw LauncherException(message);
559   }
560
561   // Return
562   delete handler;
563   return job_params;
564 }
565
566 std::map<int, Launcher::Job *>
567 Launcher_cpp::getJobs()
568 {
569   return _launcher_job_map;
570 }
571
572 void 
573 Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
574 {
575   int job_id = job->getNumber();
576
577   // Select a ressource for the job
578   std::vector<std::string> ResourceList;
579   resourceParams params = job->getResourceRequiredParams();
580   // Consider only resources that can launch batch jobs
581   params.can_launch_batch_jobs = true;
582   try
583   {
584     ResourceList = _ResManager->GetFittingResources(params);
585   }
586   catch(const ResourcesException &ex)
587   {
588     throw LauncherException(ex.msg.c_str());
589   }
590   if (ResourceList.size() == 0)
591   {
592     LAUNCHER_INFOS("No adequate resource found for the job, number " << job->getNumber());
593     job->setState("ERROR");
594     throw LauncherException("No resource found the job");
595   }
596
597   // Configure the job with the resource selected - the first of the list
598   ParserResourcesType resource_definition = _ResManager->GetResourcesDescr(ResourceList[0]);
599
600   // Set resource definition to the job
601   // The job will check if the definitions needed
602   try
603   {
604     job->setResourceDefinition(resource_definition);
605   }
606   catch(const LauncherException &ex)
607   {
608     LAUNCHER_INFOS("Error in the definition of the resource, mess: " << ex.msg);
609     job->setState("ERROR");
610     throw ex;
611   }
612
613   // Step 2: We can now add a Factory if the resource is correctly define
614 #ifdef WITH_LIBBATCH
615   std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
616   if(it == _batchmap.end())
617   {
618     try
619     {
620       // Warning cannot write on one line like this, because map object is constructed before
621       // the method is called...
622       //_batchmap[job_id] = FactoryBatchManager(resource_definition);
623       Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition);
624       _batchmap[job_id] = batch_client;
625     }
626     catch(const LauncherException &ex)
627     {
628       LAUNCHER_INFOS("Error during creation of the batch manager of the job, mess: " << ex.msg);
629       throw ex;
630     }
631     catch(const Batch::GenericException &ex)
632     {
633       LAUNCHER_INFOS("Error during creation of the batch manager of the job, mess: " << ex.message);
634       throw LauncherException(ex.message);
635     }
636   }
637 #endif
638 }
639
640 void 
641 Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job, const std::string job_reference)
642 {
643   // Step 0: Calculated job_id
644   pthread_mutex_lock(_job_cpt_mutex);
645   int job_id = _job_cpt;
646   _job_cpt++;
647   new_job->setNumber(job_id);
648   pthread_mutex_unlock(_job_cpt_mutex);
649
650   // Step 1: check if resource is already in the map
651   createBatchManagerForJob(new_job);
652
653   // Step 2: add the job to the batch manager
654 #ifdef WITH_LIBBATCH
655   try
656   {
657     Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()),
658                                                                   job_reference);
659     new_job->setBatchManagerJobId(batch_manager_job_id);
660   }
661   catch(const Batch::GenericException &ex)
662   {
663     LAUNCHER_INFOS("Job cannot be added, exception in addJob: " << ex.message);
664     throw LauncherException(ex.message.c_str());
665   }
666
667   // Step 3: add job to launcher map
668   std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
669   if (it_job == _launcher_job_map.end())
670     _launcher_job_map[new_job->getNumber()] = new_job;
671   else
672   {
673     LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
674     delete new_job;
675     throw LauncherException("A job as already the same id - job is not created !");
676   }
677   LAUNCHER_MESSAGE("New job added");
678 #endif
679 }