Salome HOME
Number of procs per node is now a job parameter instead of a BatchManager attribute
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
index ec873cddfd4491c8e8a8bf7f827c189166ab1db9..8ca72e476f27e8b55aa889b72b55d1f244110deb 100644 (file)
@@ -1,4 +1,4 @@
-//  Copyright (C) 2007-2010  CEA/DEN, EDF R&D, OPEN CASCADE
+//  Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
 //
 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
@@ -50,6 +50,8 @@
 #include <signal.h>
 #include <errno.h>
 #include <string.h>
+
+#include "Batch_Constants.hxx"
 #include "Batch_IOMutex.hxx"
 #include "Batch_BatchManager_Local.hxx"
 #include "Batch_RunTimeException.hxx"
@@ -60,10 +62,10 @@ namespace Batch {
 
 
   // Constructeur
-  BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
-                                         CommunicationProtocolType protocolType)
-    : BatchManager(parent, host), _connect(0),
-      _protocol(CommunicationProtocol::getInstance(protocolType)),
+  BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
+                                         const char * username,
+                                         CommunicationProtocolType protocolType, const char * mpiImpl)
+    : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
       _idCounter(0)
   {
     pthread_mutex_init(&_threads_mutex, NULL);
@@ -88,14 +90,12 @@ namespace Batch {
     pthread_cond_destroy(&_threadSyncCondition);
   }
 
-  const CommunicationProtocol & BatchManager_Local::getProtocol() const
-  {
-    return _protocol;
-  }
-
   // Methode pour le controle des jobs : soumet un job au gestionnaire
   const JobId BatchManager_Local::submitJob(const Job & job)
   {
+    // export input files in the working directory of the execution host
+    exportInputFiles(job);
+
     Job_Local jobLocal = job;
     Id id = _idCounter++;
     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
@@ -137,17 +137,24 @@ namespace Batch {
     istringstream iss(jobid.getReference());
     iss >> id;
 
-    // On retrouve le thread_id du thread
-    pthread_t thread_id;
-
     // @@@ --------> SECTION CRITIQUE <-------- @@@
     pthread_mutex_lock(&_threads_mutex);
-    if (_threads.find(id) != _threads.end())
-      thread_id = _threads[id].thread_id;
+    bool idFound = (_threads.find(id) != _threads.end());
+    if (idFound) {
+      string state = _threads[id].param[STATE];
+      if (state != FINISHED && state != FAILED) {
+        pthread_cancel(_threads[id].thread_id);
+        pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+      } else {
+        cout << "Cannot delete job " << jobid.getReference() <<
+                ". Job is already finished." << endl;
+      }
+    }
     pthread_mutex_unlock(&_threads_mutex);
     // @@@ --------> SECTION CRITIQUE <-------- @@@
 
-    cancel(thread_id);
+    if (!idFound)
+      throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
   }
 
   // Methode pour le controle des jobs : suspend un job en file d'attente
@@ -236,6 +243,14 @@ namespace Batch {
   }
 
 
+  // Ce manager ne peut pas reprendre un job
+  // On force donc l'état du job à erreur - pour cela on ne donne pas d'Id
+  // au JobId
+  const Batch::JobId
+  BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
+  {
+    return JobId(this, "undefined");
+  }
 
   // Methode pour le controle des jobs : teste si un job est present en machine
   bool BatchManager_Local::isRunning(const JobId & jobid)
@@ -255,15 +270,6 @@ namespace Batch {
     return running;
   }
 
-  // Methode de destruction d'un job
-  void BatchManager_Local::cancel(pthread_t thread_id)
-  {
-    pthread_mutex_lock(&_threads_mutex);
-    pthread_cancel(thread_id);
-    pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
-    pthread_mutex_unlock(&_threads_mutex);
-  }
-
 
   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
   {
@@ -275,7 +281,11 @@ namespace Batch {
     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
 #endif
 
-    exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
+    string fileToExecute = param[EXECUTABLE].str();
+    string::size_type p1 = fileToExecute.find_last_of("/");
+    string fileNameToExecute = fileToExecute.substr(p1+1);
+
+    exec_sub_cmd << "cd " << param[WORKDIR] << " && ./" << fileNameToExecute;
 
     if (param.find(ARGUMENTS) != param.end()) {
       Versatile V = param[ARGUMENTS];
@@ -312,7 +322,7 @@ namespace Batch {
       user = string(it->second);
     }
 
-    return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
+    return _protocol.getExecCommandArgs(exec_sub_cmd.str(), _hostname, user);
   }
 
 
@@ -347,74 +357,9 @@ namespace Batch {
     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
     // sera prise en compte par pthread_testcancel()
     Process child;
+    pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
-    pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
-
-
-    // Le code retour cumule (ORed) de tous les appels
-    // Nul en cas de reussite de l'ensemble des operations
-    int rc = 0;
-
-    // Cette table contient la liste des fichiers a detruire a la fin du processus
-    std::vector<string> files_to_delete;
-
-
-
-    // On copie les fichiers d'entree pour le fils
-    const Parametre param   = p_ta->_job.getParametre();
-    Parametre::const_iterator it;
-
-    // On initialise la variable workdir a la valeur du Current Working Directory
-    char * cwd =
-#ifdef WIN32
-      _getcwd(NULL, 0);
-#else
-      new char [PATH_MAX];
-    getcwd(cwd, PATH_MAX);
-#endif
-    string workdir = cwd;
-    delete [] cwd;
-
-    if ( (it = param.find(WORKDIR)) != param.end() ) {
-      workdir = static_cast<string>( (*it).second );
-    }
-
-    string executionhost = string(param[EXECUTIONHOST]);
-    string user;
-    if ( (it = param.find(USER)) != param.end() ) {
-      user = string(it->second);
-    }
-
-    if ( (it = param.find(INFILE)) != param.end() ) {
-      Versatile V = (*it).second;
-      Versatile::iterator Vit;
-
-      for(Vit=V.begin(); Vit!=V.end(); Vit++) {
-        CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
-        Couple cp       = cpt;
-        string local    = cp.getLocal();
-        string remote   = cp.getRemote();
-
-       std::cerr << workdir << std::endl;
-       std::cerr << remote << std::endl;
-
-        int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
-                                                                    workdir + "/" + remote,
-                                                                    executionhost, user);
-        if (status) {
-          // Echec de la copie
-          rc |= 1;
-        } else {
-          // On enregistre le fichier comme etant a detruire
-          files_to_delete.push_back(workdir + "/" + remote);
-        }
-
-      }
-    }
-
-
-
 
     // On forke/exec un nouveau process pour pouvoir controler le fils
     // (plus finement qu'avec un appel system)
@@ -437,47 +382,6 @@ namespace Batch {
     }
 #endif
 
-
-    // On copie les fichiers de sortie du fils
-    if ( (it = param.find(OUTFILE)) != param.end() ) {
-      Versatile V = (*it).second;
-      Versatile::iterator Vit;
-
-      for(Vit=V.begin(); Vit!=V.end(); Vit++) {
-        CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
-        Couple cp       = cpt;
-        string local    = cp.getLocal();
-        string remote   = cp.getRemote();
-
-        int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
-                                                                    executionhost, user,
-                                                                    local, "", "");
-        if (status) {
-          // Echec de la copie
-          rc |= 1;
-        } else {
-          // On enregistre le fichier comme etant a detruire
-          files_to_delete.push_back(workdir + "/" + remote);
-        }
-
-      }
-    }
-
-    // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
-    // ou si la creation du fils n'a pu avoir lieu
-    if ( (rc == 0) || (child < 0) ) {
-      std::vector<string>::const_iterator it;
-      for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
-        p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
-/*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
-        UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
-#ifdef WIN32
-        remove_cmd = string("\"") + remove_cmd + string("\"");
-#endif
-        system(remove_cmd.c_str());*/
-      }
-    }
-
     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
 
     // Set the job state to FINISHED or FAILED
@@ -541,7 +445,7 @@ namespace Batch {
     while (1) {
 #ifdef WIN32
       DWORD exitCode;
-      BOOL res = GetExitCodeProcess(child, &exitCode);
+      GetExitCodeProcess(child, &exitCode);
       if (exitCode != STILL_ACTIVE) {
         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
         break;
@@ -600,15 +504,15 @@ namespace Batch {
 
       // On regarde si le fils n'a pas depasse son temps (wallclock time)
       time_t child_currenttime = time(NULL);
-      time_t child_elapsedtime = child_currenttime - child_starttime;
+      long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
       if (param.find(MAXWALLTIME) != param.end()) {
-        int maxwalltime = param[MAXWALLTIME];
+        long maxwalltime = param[MAXWALLTIME];
         //       cout << "child_starttime          = " << child_starttime        << endl
         //            << "child_currenttime        = " << child_currenttime      << endl
         //            << "child_elapsedtime        = " << child_elapsedtime      << endl
         //            << "maxwalltime              = " << maxwalltime            << endl
         //            << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
-        if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
+        if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
           // On introduit une commande dans la queue du thread
           // @@@ --------> SECTION CRITIQUE <-------- @@@
@@ -619,7 +523,7 @@ namespace Batch {
           // @@@ --------> SECTION CRITIQUE <-------- @@@
 
 
-        } else if (child_elapsedtime > maxwalltime ) {
+        } else if (child_elapsedtime_minutes > maxwalltime ) {
           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
           // On introduit une commande dans la queue du thread
           // @@@ --------> SECTION CRITIQUE <-------- @@@
@@ -755,6 +659,9 @@ namespace Batch {
 
 
       // On positionne les limites systeme imposees au fils
+      // This part is deactivated because those limits should be set on the job process, not on
+      // the ssh process. If it is done properly one day, beware of the types used (int is not enough)
+      /*
       if (param.find(MAXCPUTIME) != param.end()) {
         int maxcputime = param[MAXCPUTIME];
         struct rlimit limit;
@@ -774,10 +681,11 @@ namespace Batch {
       if (param.find(MAXRAMSIZE) != param.end()) {
         int maxramsize = param[MAXRAMSIZE];
         struct rlimit limit;
-        limit.rlim_cur = maxramsize * 1024;
-        limit.rlim_max = int(maxramsize * 1.1) * 1024;
+        limit.rlim_cur = maxramsize * 1024 * 1024;
+        limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
         setrlimit(RLIMIT_AS, &limit);
       }
+      */
 
 
       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
@@ -798,7 +706,7 @@ namespace Batch {
 
 
       // On execute la commande du fils
-      int result = execve(argv[0], argv, envp);
+      execve(argv[0], argv, envp);
       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
       // No need to deallocate since nothing happens after a successful exec
 
@@ -873,7 +781,7 @@ namespace Batch {
       char * buffer = strdup(comstr.c_str());
 
       // launch the new process
-      BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
+      bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
 
       if (buffer) free(buffer);