Salome HOME
Porting functionality on Win32 Platform
[modules/kernel.git] / src / Launcher / Launcher.cxx
1 // Copyright (C) 2005  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
2 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
3 // 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either 
7 // version 2.1 of the License.
8 // 
9 // This library is distributed in the hope that it will be useful 
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of 
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
12 // Lesser General Public License for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public  
15 // License along with this library; if not, write to the Free Software 
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 //
18 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 //
20
21 #include "Launcher.hxx"
22
23 #include "Batch_Date.hxx"
24 #include "Batch_FactBatchManager_eLSF.hxx"
25 #include "Batch_FactBatchManager_ePBS.hxx"
26 #include "Batch_BatchManager_eClient.hxx"
27 #include "utilities.h"
28
29 #include <iostream>
30 #include <sstream>
31 #include <sys/stat.h>
32
33 using namespace std;
34
35 //=============================================================================
36 /*! 
37  *  Constructor
38  *  \param orb
39  *  Define a CORBA single thread policy for the server, which avoid to deal
40  *  with non thread-safe usage like Change_Directory in SALOME naming service
41  */
42 //=============================================================================
43
44 Launcher_cpp::Launcher_cpp()
45 {
46   MESSAGE ( "Launcher_cpp constructor" );
47 }
48
49 //=============================================================================
50 /*! 
51  * destructor
52  */
53 //=============================================================================
54
55 Launcher_cpp::~Launcher_cpp()
56 {
57   MESSAGE ( "Launcher_cpp destructor" );
58   std::map < string, Batch::BatchManager_eClient * >::const_iterator it1;
59   for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
60     delete it1->second;
61   std::map < std::pair<std::string,long> , Batch::Job* >::const_iterator it2;
62   for(it2=_jobmap.begin();it2!=_jobmap.end();it2++)
63     delete it2->second;
64 }
65
66 //=============================================================================
67 /*! CORBA Method:
68  *  Submit a batch job on a cluster and returns the JobId
69  *  \param fileToExecute      : .py/.exe/.sh/... to execute on the batch cluster
70  *  \param filesToExport      : to export on the batch cluster
71  *  \param NumberOfProcessors : Number of processors needed on the batch cluster
72  *  \param params             : Constraints for the choice of the batch cluster
73  */
74 //=============================================================================
75 long Launcher_cpp::submitSalomeJob( const string fileToExecute ,
76                                     const vector<string>& filesToExport ,
77                                     const vector<string>& filesToImport ,
78                                     const batchParams& batch_params,
79                                     const machineParams& params) throw(LauncherException)
80 {
81   MESSAGE ( "BEGIN OF Launcher_cpp::submitSalomeJob" );
82   long jobId;
83   vector<string> aMachineList;
84
85   // check batch params
86   if ( !check(batch_params) )
87     throw LauncherException("Batch parameters are bad (see informations above)");
88
89   // find a cluster matching the structure params
90   vector<string> aCompoList ;
91   try{
92     aMachineList = _ResManager->GetFittingResources(params, aCompoList);
93   }
94   catch(const ResourcesException &ex){
95     throw LauncherException(ex.msg.c_str());
96   }
97   if (aMachineList.size() == 0)
98     throw LauncherException("No resources have been found with your parameters");
99
100   ParserResourcesType p = _ResManager->GetResourcesList(aMachineList[0]);
101   string clustername(p.Alias);
102   MESSAGE ( "Choose cluster: " <<  clustername );
103   
104   // search batch manager for that cluster in map or instanciate one
105   map < string, Batch::BatchManager_eClient * >::const_iterator it = _batchmap.find(clustername);
106   if(it == _batchmap.end())
107     {
108       _batchmap[clustername] = FactoryBatchManager(p);
109       // TODO: Add a test for the cluster !
110     }
111     
112   try{
113     // tmp directory on cluster to put files to execute
114     string tmpdir = getTmpDirForBatchFiles();
115
116     // create and submit job on cluster
117     Batch::Parametre param;
118     param[USER] = p.UserName;
119     param[EXECUTABLE] = buildSalomeCouplingScript(fileToExecute,tmpdir,p);
120     param[INFILE] = Batch::Couple( fileToExecute, getRemoteFile(tmpdir,fileToExecute) );
121     for(int i=0;i<filesToExport.size();i++)
122       param[INFILE] += Batch::Couple( filesToExport[i], getRemoteFile(tmpdir,filesToExport[i]) );
123
124
125     ostringstream file_name_output;
126     file_name_output << "~/" << tmpdir << "/" << "runSalome.output.log*";
127     ostringstream file_name_error;
128     file_name_error << "~/" << tmpdir << "/" << "runSalome.error.log*";
129     ostringstream file_container_log;
130     file_container_log << "~/" << tmpdir << "/" << "YACS_Server*";
131     param[OUTFILE] = Batch::Couple( "", file_name_output.str());
132     param[OUTFILE] += Batch::Couple( "", file_name_error.str());
133     param[OUTFILE] += Batch::Couple( "", file_container_log.str());
134
135     for(int i=0;i<filesToImport.size();i++)
136       param[OUTFILE] += Batch::Couple( "", filesToImport[i] );
137
138     param[NBPROC] = batch_params.nb_proc;
139     param[WORKDIR] = batch_params.batch_directory;
140     param[TMPDIR] = tmpdir;
141     param[MAXWALLTIME] = getWallTime(batch_params.expected_during_time);
142     param[MAXRAMSIZE] = getRamSize(batch_params.mem);
143     param[HOMEDIR] = getHomeDir(p, tmpdir);
144
145     Batch::Environnement env;
146
147     Batch::Job* job = new Batch::Job(param,env);
148
149     // submit job on cluster
150     Batch::JobId jid = _batchmap[clustername]->submitJob(*job);
151
152     // get job id in long
153     istringstream iss(jid.getReference());
154     iss >> jobId;
155
156     _jobmap[ pair<string,long>(clustername,jobId) ] = job;    
157   }
158   catch(const Batch::EmulationException &ex){
159     throw LauncherException(ex.msg.c_str());
160   }
161
162   return jobId;
163 }
164
165 //=============================================================================
166 /*! CORBA Method:
167  *  Query a batch job on a cluster and returns the status of job
168  *  \param jobId              : identification of Salome job
169  *  \param params             : Constraints for the choice of the batch cluster
170  */
171 //=============================================================================
172 string Launcher_cpp::querySalomeJob( long id, 
173                                      const machineParams& params) throw(LauncherException)
174 {
175   // find a cluster matching params structure
176   vector<string> aCompoList ;
177   vector<string> aMachineList = _ResManager->GetFittingResources( params , aCompoList ) ;
178   ParserResourcesType p = _ResManager->GetResourcesList(aMachineList[0]);
179   string clustername(p.Alias);
180     
181   // search batch manager for that cluster in map
182   std::map < string, Batch::BatchManager_eClient * >::const_iterator it = _batchmap.find(clustername);
183   if(it == _batchmap.end())
184     throw LauncherException("no batchmanager for that cluster");
185     
186   ostringstream oss;
187   oss << id;
188   Batch::JobId jobId( _batchmap[clustername], oss.str() );
189
190   Batch::JobInfo jinfo = jobId.queryJob();
191   Batch::Parametre par = jinfo.getParametre();
192   return par[STATE];
193 }
194
195 //=============================================================================
196 /*! CORBA Method:
197  *  Delete a batch job on a cluster 
198  *  \param jobId              : identification of Salome job
199  *  \param params             : Constraints for the choice of the batch cluster
200  */
201 //=============================================================================
202 void Launcher_cpp::deleteSalomeJob( const long id, 
203                                     const machineParams& params) throw(LauncherException)
204 {
205   // find a cluster matching params structure
206   vector<string> aCompoList ;
207   vector<string> aMachineList = _ResManager->GetFittingResources( params , aCompoList ) ;
208   ParserResourcesType p = _ResManager->GetResourcesList(aMachineList[0]);
209   string clustername(p.Alias);
210     
211   // search batch manager for that cluster in map
212   map < string, Batch::BatchManager_eClient * >::const_iterator it = _batchmap.find(clustername);
213   if(it == _batchmap.end())
214     throw LauncherException("no batchmanager for that cluster");
215   
216   ostringstream oss;
217   oss << id;
218   Batch::JobId jobId( _batchmap[clustername], oss.str() );
219
220   jobId.deleteJob();
221 }
222
223 //=============================================================================
224 /*! CORBA Method:
225  *  Get result files of job on a cluster
226  *  \param jobId              : identification of Salome job
227  *  \param params             : Constraints for the choice of the batch cluster
228  */
229 //=============================================================================
230 void Launcher_cpp::getResultSalomeJob( const string directory,
231                                        const long id, 
232                                        const machineParams& params) throw(LauncherException)
233 {
234   vector<string> aCompoList ;
235   vector<string> aMachineList = _ResManager->GetFittingResources( params , aCompoList ) ;
236   ParserResourcesType p = _ResManager->GetResourcesList(aMachineList[0]);
237   string clustername(p.Alias);
238     
239   // search batch manager for that cluster in map
240   map < string, Batch::BatchManager_eClient * >::const_iterator it = _batchmap.find(clustername);
241   if(it == _batchmap.end())
242     throw LauncherException("no batchmanager for that cluster");
243     
244   Batch::Job* job = _jobmap[ pair<string,long>(clustername,id) ];
245
246   _batchmap[clustername]->importOutputFiles( *job, directory );
247 }
248
249 //=============================================================================
250 /*!
251  *  Factory to instanciate the good batch manager for choosen cluster.
252  */ 
253 //=============================================================================
254
255 Batch::BatchManager_eClient *Launcher_cpp::FactoryBatchManager( const ParserResourcesType& params ) throw(LauncherException)
256 {
257
258   std::string hostname, protocol, mpi;
259   Batch::FactBatchManager_eClient* fact;
260
261   hostname = params.Alias;
262   switch(params.Protocol){
263   case rsh:
264     protocol = "rsh";
265     break;
266   case ssh:
267     protocol = "ssh";
268     break;
269   default:
270     throw LauncherException("unknown protocol");
271     break;
272   }
273   switch(params.mpi){
274   case lam:
275     mpi = "lam";
276     break;
277   case mpich1:
278     mpi = "mpich1";
279     break;
280   case mpich2:
281     mpi = "mpich2";
282     break;
283   case openmpi:
284     mpi = "openmpi";
285     break;
286   case slurm:
287     mpi = "slurm";
288     break;
289   default:
290     mpi = "indif";
291     break;
292   }    
293   MESSAGE ( "Instanciation of batch manager" );
294   switch( params.Batch ){
295   case pbs:
296     MESSAGE ( "Instantiation of PBS batch manager" );
297     fact = new Batch::FactBatchManager_ePBS;
298     break;
299   case lsf:
300     MESSAGE ( "Instantiation of LSF batch manager" );
301     fact = new Batch::FactBatchManager_eLSF;
302     break;
303   default:
304     MESSAGE ( "BATCH = " << params.Batch );
305     throw LauncherException("no batchmanager for that cluster");
306   }
307   return (*fact)(hostname.c_str(),protocol.c_str(),mpi.c_str());
308 }
309
310 string Launcher_cpp::buildSalomeCouplingScript(const string fileToExecute, const string dirForTmpFiles, const ParserResourcesType& params)
311 {
312 #ifndef WIN32 //TODO: need for porting on Windows
313   int idx = dirForTmpFiles.find("Batch/");
314   std::string filelogtemp = dirForTmpFiles.substr(idx+6, dirForTmpFiles.length());
315
316   string::size_type p1 = fileToExecute.find_last_of("/");
317   string::size_type p2 = fileToExecute.find_last_of(".");
318   std::string fileNameToExecute = fileToExecute.substr(p1+1,p2-p1-1);
319   std::string TmpFileName = "/tmp/runSalome_" + fileNameToExecute + ".sh";
320
321   MpiImpl* mpiImpl = FactoryMpiImpl(params.mpi);
322
323   ofstream tempOutputFile;
324   tempOutputFile.open(TmpFileName.c_str(), ofstream::out );
325
326   // Begin
327   tempOutputFile << "#! /bin/sh -f" << endl ;
328   tempOutputFile << "cd " ;
329   tempOutputFile << params.AppliPath << endl ;
330   tempOutputFile << "export SALOME_BATCH=1\n";
331   tempOutputFile << "export PYTHONPATH=~/" ;
332   tempOutputFile << dirForTmpFiles ;
333   tempOutputFile << ":$PYTHONPATH" << endl ;
334
335   // Test node rank
336   tempOutputFile << "if test " ;
337   tempOutputFile << mpiImpl->rank() ;
338   tempOutputFile << " = 0; then" << endl ;
339
340   // -----------------------------------------------
341   // Code for rank 0 : launch runAppli and a container
342   // RunAppli
343   if(params.ModulesList.size()>0)
344     tempOutputFile << "  ./runAppli --terminal --modules=" ;
345   else
346     tempOutputFile << "  ./runAppli --terminal ";
347   for ( int i = 0 ; i < params.ModulesList.size() ; i++ ) {
348     tempOutputFile << params.ModulesList[i] ;
349     if ( i != params.ModulesList.size()-1 )
350       tempOutputFile << "," ;
351   }
352   tempOutputFile << " --standalone=registry,study,moduleCatalog --ns-port-log="
353                  << filelogtemp 
354                  << " &\n";
355
356   // Wait NamingService
357   tempOutputFile << "  current=0\n"
358                  << "  stop=20\n" 
359                  << "  while ! test -f " << filelogtemp << "\n"
360                  << "  do\n"
361                  << "    sleep 2\n"
362                  << "    let current=current+1\n"
363                  << "    if [ \"$current\" -eq \"$stop\" ] ; then\n"
364                  << "      echo Error Naming Service failed ! >&2"
365                  << "      exit\n"
366                  << "    fi\n"
367                  << "  done\n"
368                  << "  port=`cat " << filelogtemp << "`\n";
369     
370   // Wait other containers
371   tempOutputFile << "  for ((ip=1; ip < ";
372   tempOutputFile << mpiImpl->size();
373   tempOutputFile << " ; ip++))" << endl;
374   tempOutputFile << "  do" << endl ;
375   tempOutputFile << "    arglist=\"$arglist YACS_Server_\"$ip" << endl ;
376   tempOutputFile << "  done" << endl ;
377   tempOutputFile << "  sleep 5" << endl ;
378   tempOutputFile << "  ./runSession waitContainers.py $arglist" << endl ;
379   
380   // Launch user script
381   tempOutputFile << "  ./runSession python ~/" << dirForTmpFiles << "/" << fileNameToExecute << ".py" << endl;
382
383   // Stop application
384   tempOutputFile << "  rm " << filelogtemp << "\n"
385                  << "  ./runSession shutdownSalome.py" << endl;
386
387   // -------------------------------------
388   // Other nodes launch a container
389   tempOutputFile << "else" << endl ;
390
391   // Wait NamingService
392   tempOutputFile << "  current=0\n"
393                  << "  stop=20\n" 
394                  << "  while ! test -f " << filelogtemp << "\n"
395                  << "  do\n"
396                  << "    sleep 2\n"
397                  << "    let current=current+1\n"
398                  << "    if [ \"$current\" -eq \"$stop\" ] ; then\n"
399                  << "      echo Error Naming Service failed ! >&2"
400                  << "      exit\n"
401                  << "    fi\n"
402                  << "  done\n"
403                  << "  port=`cat " << filelogtemp << "`\n";
404
405   // Launching container
406   tempOutputFile << "  ./runSession SALOME_Container YACS_Server_";
407   tempOutputFile << mpiImpl->rank()
408                  << " > ~/" << dirForTmpFiles << "/YACS_Server_" 
409                  << mpiImpl->rank() << "_container_log." << filelogtemp
410                  << " 2>&1\n";
411   tempOutputFile << "fi" << endl ;
412   tempOutputFile.flush();
413   tempOutputFile.close();
414   chmod(TmpFileName.c_str(), 0x1ED);
415   MESSAGE ( TmpFileName.c_str() );
416
417   delete mpiImpl;
418
419   return TmpFileName;
420 #else
421   return "";
422 #endif
423     
424 }
425
426 MpiImpl *Launcher_cpp::FactoryMpiImpl(MpiImplType mpi) throw(LauncherException)
427 {
428   switch(mpi){
429   case lam:
430     return new MpiImpl_LAM();
431   case mpich1:
432     return new MpiImpl_MPICH1();
433   case mpich2:
434     return new MpiImpl_MPICH2();
435   case openmpi:
436     return new MpiImpl_OPENMPI();
437   case slurm:
438     return new MpiImpl_SLURM();
439   case indif:
440     throw LauncherException("you must specify a mpi implementation in CatalogResources.xml file");
441   default:
442     ostringstream oss;
443     oss << mpi << " : not yet implemented";
444     throw LauncherException(oss.str().c_str());
445   }
446
447 }
448
449 string Launcher_cpp::getTmpDirForBatchFiles()
450 {
451   string ret;
452   string thedate;
453
454   // Adding date to the directory name
455   Batch::Date date = Batch::Date(time(0));
456   thedate = date.str();
457   int lend = thedate.size() ;
458   int i = 0 ;
459   while ( i < lend ) {
460     if ( thedate[i] == '/' || thedate[i] == '-' || thedate[i] == ':' ) {
461       thedate[i] = '_' ;
462     }
463     i++ ;
464   }
465
466   ret = string("Batch/");
467   ret += thedate;
468   return ret;
469 }
470
471 string Launcher_cpp::getRemoteFile( std::string remoteDir, std::string localFile )
472 {
473   string::size_type pos = localFile.find_last_of("/") + 1;
474   int ln = localFile.length() - pos;
475   string remoteFile = remoteDir + "/" + localFile.substr(pos,ln);
476   return remoteFile;
477 }
478
479 bool Launcher_cpp::check(const batchParams& batch_params)
480 {
481   bool rtn = true;
482   MESSAGE ( "Job parameters are :" );
483   MESSAGE ( "Directory : $HOME/Batch/$date" );
484
485   // check expected_during_time (check the format)
486   std::string edt_info = batch_params.expected_during_time;
487   std::string edt_value = batch_params.expected_during_time;
488   if (edt_value != "") {
489     std::string begin_edt_value = edt_value.substr(0, 2);
490     std::string mid_edt_value = edt_value.substr(2, 1);
491     std::string end_edt_value = edt_value.substr(3);
492   
493     long value;
494     std::istringstream iss(begin_edt_value);
495     if (!(iss >> value)) {
496       edt_info = "Error on definition ! : " + edt_value;
497       rtn = false;
498     }
499     else if (value < 0) {
500       edt_info = "Error on definition time is negative ! : " + value;
501       rtn = false;
502     }
503     std::istringstream iss_2(end_edt_value);
504     if (!(iss_2 >> value)) {
505       edt_info = "Error on definition ! : " + edt_value;
506       rtn = false;
507     }
508     else if (value < 0) {
509       edt_info = "Error on definition time is negative ! : " + value;
510       rtn = false;
511     }
512     if (mid_edt_value != ":") {
513       edt_info = "Error on definition ! :" + edt_value;
514       rtn = false;
515     }
516   }
517   else {
518     edt_info = "No value given";
519   }
520   MESSAGE ( "Expected during time : " << edt_info );
521
522   // check memory (check the format)
523   std::string mem_info;
524   std::string mem_value = batch_params.mem;
525   if (mem_value != "") {
526     std::string begin_mem_value = mem_value.substr(0, mem_value.length()-2);
527     long re_mem_value;
528     std::istringstream iss(begin_mem_value);
529     if (!(iss >> re_mem_value)) {
530       mem_info = "Error on definition ! : " + mem_value;
531       rtn = false;
532     }
533     else if (re_mem_value <= 0) {
534       mem_info = "Error on definition memory is negative ! : " + mem_value;
535       rtn = false;
536     }
537     std::string end_mem_value = mem_value.substr(mem_value.length()-2);
538     if (end_mem_value != "gb" && end_mem_value != "mb") {
539       mem_info = "Error on definition, type is bad ! " + mem_value;
540       rtn = false;
541     }
542   }
543   else {
544     mem_info = "No value given";
545   }
546   MESSAGE ( "Memory : " << mem_info );
547
548   // check nb_proc
549   std::string nb_proc_info;
550   ostringstream nb_proc_value;
551   nb_proc_value << batch_params.nb_proc;
552   if(batch_params.nb_proc <= 0) {
553     nb_proc_info = "Bad value ! nb_proc = ";
554     nb_proc_info += nb_proc_value.str();
555     rtn = false;
556   }
557   else {
558     nb_proc_info = nb_proc_value.str();
559   }
560   MESSAGE ( "Nb of processors : " << nb_proc_info );
561
562   return rtn;
563 }
564
565 long Launcher_cpp::getWallTime(std::string edt)
566 {
567   long hh, mm, ret;
568
569   if( edt.size() == 0 )
570     return 0;
571
572   string::size_type pos = edt.find(":");
573   string h = edt.substr(0,pos);
574   string m = edt.substr(pos+1,edt.size()-pos+1);
575   istringstream issh(h);
576   issh >> hh;
577   istringstream issm(m);
578   issm >> mm;
579   ret = hh*60 + mm;
580   return  ret;
581 }
582
583 long Launcher_cpp::getRamSize(std::string mem)
584 {
585   long mv;
586
587   if( mem.size() == 0 )
588     return 0;
589
590   string ram = mem.substr(0,mem.size()-2);
591   istringstream iss(ram);
592   iss >> mv;
593   string unity = mem.substr(mem.size()-2,2);
594   if( (unity.find("gb") != string::npos) || (unity.find("GB") != string::npos) )
595     return mv*1024;
596   else if( (unity.find("mb") != string::npos) || (unity.find("MB") != string::npos) )
597     return mv;
598   else if( (unity.find("kb") != string::npos) || (unity.find("KB") != string::npos) )
599     return mv/1024;
600   else if( (unity.find("b") != string::npos) || (unity.find("B") != string::npos) )
601     return mv/(1024*1024);
602   else
603     return 0;
604 }
605
606 std::string
607 Launcher_cpp::getHomeDir(const ParserResourcesType& p, const std::string& tmpdir)
608 {
609     std::string home;
610     std::string command;
611     int idx = tmpdir.find("Batch/");
612     std::string filelogtemp = tmpdir.substr(idx+6, tmpdir.length());
613     filelogtemp = "/tmp/logs" + filelogtemp + "_home";
614
615     if( p.Protocol == rsh )
616       command = "rsh ";
617     else if( p.Protocol == ssh )
618       command = "ssh ";
619     else
620       throw LauncherException("Unknown protocol");
621     if (p.UserName != ""){
622       command += p.UserName;
623       command += "@";
624     }
625     command += p.Alias;
626     command += " 'echo $HOME' > ";
627     command += filelogtemp;
628     MESSAGE ( command.c_str() );
629     int status = system(command.c_str());
630     if(status)
631       throw LauncherException("Error of launching home command on remote host");
632
633     std::ifstream file_home(filelogtemp.c_str());
634     std::getline(file_home, home);
635     file_home.close();
636     return home;
637 }