Salome HOME
Changed year in copyrights
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
index 4166e3f80a2e5bcfced8ef3c4ad15a50de93dd5c..c7d3f60d0f857065feee0ac6a341f8d4bd9f45c2 100644 (file)
@@ -1,4 +1,4 @@
-//  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
+//  Copyright (C) 2007-2011  CEA/DEN, EDF R&D, OPEN CASCADE
 //
 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
@@ -41,7 +41,6 @@
 #include <sys/types.h>
 #ifdef WIN32
 #include <direct.h>
-#include "Batch_RunTimeException.hxx"
 #else
 #include <sys/wait.h>
 #include <unistd.h>
 #include <signal.h>
 #include <errno.h>
 #include <string.h>
+
+#include "Batch_Constants.hxx"
 #include "Batch_IOMutex.hxx"
 #include "Batch_BatchManager_Local.hxx"
+#include "Batch_RunTimeException.hxx"
 
 using namespace std;
 
@@ -62,21 +64,30 @@ namespace Batch {
   // Constructeur
   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
                                          CommunicationProtocolType protocolType)
-    : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(),
+    : BatchManager(parent, host), _connect(0),
       _protocol(CommunicationProtocol::getInstance(protocolType)),
-      _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
+      _idCounter(0)
   {
     pthread_mutex_init(&_threads_mutex, NULL);
-    pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
-    pthread_cond_init(&_thread_id_id_association_cond, NULL);
+    pthread_cond_init(&_threadSyncCondition, NULL);
   }
 
   // Destructeur
   BatchManager_Local::~BatchManager_Local()
   {
+    for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
+      pthread_mutex_lock(&_threads_mutex);
+      string state = iter->second.param[STATE];
+      if (state != FINISHED && state != FAILED) {
+        UNDER_LOCK( cout << "Warning: Job " << iter->first <<
+                            " is not finished, it will now be canceled." << endl );
+        pthread_cancel(iter->second.thread_id);
+        pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+      }
+      pthread_mutex_unlock(&_threads_mutex);
+    }
     pthread_mutex_destroy(&_threads_mutex);
-    pthread_mutex_destroy(&_thread_id_id_association_mutex);
-    pthread_cond_destroy(&_thread_id_id_association_cond);
+    pthread_cond_destroy(&_threadSyncCondition);
   }
 
   const CommunicationProtocol & BatchManager_Local::getProtocol() const
@@ -88,15 +99,36 @@ namespace Batch {
   const JobId BatchManager_Local::submitJob(const Job & job)
   {
     Job_Local jobLocal = job;
+    Id id = _idCounter++;
+    ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
+
+    // Les attributs du thread a sa creation
+    pthread_attr_t thread_attr;
+    pthread_attr_init(&thread_attr);
+    pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
+
+    // Creation du thread qui va executer la commande systeme qu'on lui passe
+    pthread_t thread_id;
+    pthread_mutex_lock(&_threads_mutex);
+    int rc = pthread_create(&thread_id,
+      &thread_attr,
+      &ThreadAdapter::run,
+      static_cast<void *>(p_ta));
 
-    pthread_t thread_id = submit(jobLocal);
+    // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
+    pthread_attr_destroy(&thread_attr);
 
-    ostringstream oss;
-    oss << getIdByThread_id(thread_id);
+    if (rc != 0) {
+      pthread_mutex_unlock(&_threads_mutex);
+      throw RunTimeException("Can't create new thread in BatchManager_Local");
+    }
 
-    JobId id(this, oss.str());
+    pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+    pthread_mutex_unlock(&_threads_mutex);
 
-    return id;
+    ostringstream id_sst;
+    id_sst << id;
+    return JobId(this, id_sst.str());
   }
 
   // Methode pour le controle des jobs : retire un job du gestionnaire
@@ -107,17 +139,24 @@ namespace Batch {
     istringstream iss(jobid.getReference());
     iss >> id;
 
-    // On retrouve le thread_id du thread
-    pthread_t thread_id;
-
     // @@@ --------> SECTION CRITIQUE <-------- @@@
     pthread_mutex_lock(&_threads_mutex);
-    if (_threads.find(id) != _threads.end())
-      thread_id = _threads[id].thread_id;
+    bool idFound = (_threads.find(id) != _threads.end());
+    if (idFound) {
+      string state = _threads[id].param[STATE];
+      if (state != FINISHED && state != FAILED) {
+        pthread_cancel(_threads[id].thread_id);
+        pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+      } else {
+        cout << "Cannot delete job " << jobid.getReference() <<
+                ". Job is already finished." << endl;
+      }
+    }
     pthread_mutex_unlock(&_threads_mutex);
     // @@@ --------> SECTION CRITIQUE <-------- @@@
 
-    cancel(thread_id);
+    if (!idFound)
+      throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
   }
 
   // Methode pour le controle des jobs : suspend un job en file d'attente
@@ -189,17 +228,31 @@ namespace Batch {
     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
     // @@@ --------> SECTION CRITIQUE <-------- @@@
     pthread_mutex_lock(&_threads_mutex);
-    param = _threads[id].param;
-    env   = _threads[id].env;
+    std::map<Id, Child >::iterator pos = _threads.find(id);
+    bool found = (pos != _threads.end());
+    if (found) {
+      param = pos->second.param;
+      env   = pos->second.env;
+    }
     pthread_mutex_unlock(&_threads_mutex);
     // @@@ --------> SECTION CRITIQUE <-------- @@@
     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
 
+    if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
+
     JobInfo_Local ji(param, env);
     return ji;
   }
 
 
+  // Ce manager ne peut pas reprendre un job
+  // On force donc l'état du job à erreur - pour cela on ne donne pas d'Id
+  // au JobId
+  const Batch::JobId
+  BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
+  {
+    return JobId(this, "undefined");
+  }
 
   // Methode pour le controle des jobs : teste si un job est present en machine
   bool BatchManager_Local::isRunning(const JobId & jobid)
@@ -208,57 +261,15 @@ namespace Batch {
     istringstream iss(jobid.getReference());
     iss >> id;
 
-    Status status;
-
     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
     // @@@ --------> SECTION CRITIQUE <-------- @@@
     pthread_mutex_lock(&_threads_mutex);
-    status = _threads[id].status;
+    bool running = (_threads[id].param[STATE].str() == RUNNING);
     pthread_mutex_unlock(&_threads_mutex);
     // @@@ --------> SECTION CRITIQUE <-------- @@@
     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
 
-    return (status == RUNNING);
-  }
-
-
-  // Methode d'execution d'un job
-  pthread_t BatchManager_Local::submit(const Job_Local & job)
-  {
-    // L'id du thread a creer
-    pthread_t thread_id =
-#ifdef WIN32
-    {0,0};
-#else
-      0;
-#endif
-
-    // Les attributs du thread a sa creation
-    pthread_attr_t thread_attr;
-    pthread_attr_init(&thread_attr);
-    pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
-
-    ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
-
-    // Creation du thread qui va executer la commande systeme qu'on lui passe
-    int rc = pthread_create(&thread_id,
-      &thread_attr,
-      &ThreadAdapter::run,
-      static_cast<void *>(p_ta));
-    if (rc) {
-    }
-
-    // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
-    pthread_attr_destroy(&thread_attr);
-
-    return thread_id;
-  }
-
-
-  // Methode de destruction d'un job
-  void BatchManager_Local::cancel(pthread_t thread_id)
-  {
-    pthread_cancel(thread_id);
+    return running;
   }
 
 
@@ -283,6 +294,26 @@ namespace Batch {
       }
     }
 
+    if (param.find(INFILE) != param.end()) {
+      Versatile V = param[INFILE];
+      for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
+        Couple cpl = * static_cast<CoupleType*>(*it);
+        string remote = cpl.getRemote();
+        if (remote == "stdin")
+        exec_sub_cmd << " <stdin";
+      }
+    }
+
+    if (param.find(OUTFILE) != param.end()) {
+      Versatile V = param[OUTFILE];
+      for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
+        Couple cpl = * static_cast<CoupleType*>(*it);
+        string remote = cpl.getRemote();
+        if (remote == "stdout") exec_sub_cmd << " 1>stdout";
+        if (remote == "stderr") exec_sub_cmd << " 2>stderr";
+      }
+    }
+
     string user;
     Parametre::const_iterator it = param.find(USER);
     if (it != param.end()) {
@@ -293,79 +324,10 @@ namespace Batch {
   }
 
 
-  // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
-  // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
-  // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
-  // Thread_id / Id (_thread_id_id_association_mutex)
-  BatchManager_Local::Id BatchManager_Local::nextId()
-  {
-    static Id id = 0;
-    Id nextId = id++;
-    //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
-    return nextId;
-  }
-
-
-  // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
-  BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
-  {
-    // @@@ --------> SECTION CRITIQUE <-------- @@@
-    pthread_mutex_lock(&_thread_id_id_association_mutex);
-    bool threadIdFound = false;
-    std::list<struct ThreadIdIdAssociation>::iterator it;
-    while (!threadIdFound) {
-      for (it = _thread_id_id_association.begin() ;
-           it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
-           it++);
-      if (it == _thread_id_id_association.end())
-        pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
-      else
-        threadIdFound = true;
-    }
-
-    Id id = it->id;
-    _thread_id_id_association.erase(it);
-
-    pthread_mutex_unlock(&_thread_id_id_association_mutex);
-    // @@@ --------> SECTION CRITIQUE <-------- @@@
-
-    //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
-    return id;
-  }
-
-
-  // Associe un Thread_id a un Id nouvellement cree
-  BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
-  {
-    Id id = -1;
-
-    // @@@ --------> SECTION CRITIQUE <-------- @@@
-    pthread_mutex_lock(&_thread_id_id_association_mutex);
-    std::list<struct ThreadIdIdAssociation>::iterator it;
-    for (it = _thread_id_id_association.begin() ;
-         it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
-         it++);
-    if (it == _thread_id_id_association.end()) {
-      struct ThreadIdIdAssociation newAssociation;
-      id = newAssociation.id = nextId();
-      newAssociation.threadId = thread_id;
-      _thread_id_id_association.push_back(newAssociation);
-      pthread_cond_signal(&_thread_id_id_association_cond);
-
-    } else {
-      UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
-    }
-    pthread_mutex_unlock(&_thread_id_id_association_mutex);
-    // @@@ --------> SECTION CRITIQUE <-------- @@@
-
-    //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
-    return id;
-  }
-
 
   // Constructeur de la classe ThreadAdapter
-  BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
-  _bm(bm), _job(job)
+  BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
+  _bm(bm), _job(job), _id(id)
   {
     // Nothing to do
   }
@@ -375,6 +337,8 @@ namespace Batch {
   // Methode d'execution du thread
   void * BatchManager_Local::ThreadAdapter::run(void * arg)
   {
+    ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+
 #ifndef WIN32
     // On bloque tous les signaux pour ce thread
     sigset_t setmask;
@@ -391,12 +355,9 @@ namespace Batch {
     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
     // sera prise en compte par pthread_testcancel()
     Process child;
-    pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
-
-    ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
-
-
+    pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
+    pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
 
 
     // Le code retour cumule (ORed) de tous les appels
@@ -443,6 +404,9 @@ namespace Batch {
         string local    = cp.getLocal();
         string remote   = cp.getRemote();
 
+       std::cerr << workdir << std::endl;
+       std::cerr << remote << std::endl;
+
         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
                                                                     workdir + "/" + remote,
                                                                     executionhost, user);
@@ -463,6 +427,8 @@ namespace Batch {
     // On forke/exec un nouveau process pour pouvoir controler le fils
     // (plus finement qu'avec un appel system)
     // int rc = system(commande.c_str());
+    //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
+    //execv("/usr/bin/ssh", parmList);
 #ifdef WIN32
     child = p_ta->launchWin32ChildProcess();
     p_ta->pere(child);
@@ -520,7 +486,10 @@ namespace Batch {
       }
     }
 
+    pthread_mutex_lock(&p_ta->_bm._threads_mutex);
 
+    // Set the job state to FINISHED or FAILED
+    p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
 
     // On retire la fonction de nettoyage de la memoire
     pthread_cleanup_pop(0);
@@ -528,7 +497,10 @@ namespace Batch {
     // On retire la fonction de suppression du fils
     pthread_cleanup_pop(0);
 
+    // remove setFailedOnCancel function from cancel stack
+    pthread_cleanup_pop(0);
 
+    pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
 
     // On invoque la fonction de nettoyage de la memoire
     delete_on_exit(arg);
@@ -547,33 +519,30 @@ namespace Batch {
 
     // On enregistre le fils dans la table des threads
     pthread_t thread_id = pthread_self();
-    Id id = _bm.registerThread_id(thread_id);
 
     Parametre param   = _job.getParametre();
     Environnement env = _job.getEnvironnement();
 
-    ostringstream thread_id_sst;
-    thread_id_sst << id;
-    param[ID]         = thread_id_sst.str();
-    param[STATE]      = "Running";
+    ostringstream id_sst;
+    id_sst << _id;
+    param[ID]         = id_sst.str();
+    param[STATE]      = Batch::RUNNING;
 #ifndef WIN32
     param[PID]        = child;
 #endif
 
-    // @@@ --------> SECTION CRITIQUE <-------- @@@
-    pthread_mutex_lock(&_bm._threads_mutex);
-    _bm._threads[id].thread_id = thread_id;
+    _bm._threads[_id].thread_id = thread_id;
 #ifndef WIN32
-    _bm._threads[id].pid       = child;
+    _bm._threads[_id].pid       = child;
 #endif
-    _bm._threads[id].status    = RUNNING;
-    _bm._threads[id].param     = param;
-    _bm._threads[id].env       = env;
-    _bm._threads[id].command_queue.push(NOP);
-    pthread_mutex_unlock(&_bm._threads_mutex);
-    // @@@ --------> SECTION CRITIQUE <-------- @@@
-
+    _bm._threads[_id].hasFailed = false;
+    _bm._threads[_id].param     = param;
+    _bm._threads[_id].env       = env;
+    _bm._threads[_id].command_queue.push(NOP);
 
+    // Unlock the master thread. From here, all shared variables must be protected
+    // from concurrent access
+    pthread_cond_signal(&_bm._threadSyncCondition);
 
 
     // on boucle en attendant que le fils ait termine
@@ -582,11 +551,6 @@ namespace Batch {
       DWORD exitCode;
       BOOL res = GetExitCodeProcess(child, &exitCode);
       if (exitCode != STILL_ACTIVE) {
-        pthread_mutex_lock(&_bm._threads_mutex);
-        _bm._threads[id].status       = DONE;
-        _bm._threads[id].param[STATE] = "Done";
-        pthread_mutex_unlock(&_bm._threads_mutex);
-        // @@@ --------> SECTION CRITIQUE <-------- @@@
         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
         break;
       }
@@ -594,6 +558,16 @@ namespace Batch {
       int child_rc = 0;
       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
       if (child_wait_rc > 0) {
+        UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
+        UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
+        UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
+        UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
+        UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
+        UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
+        UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
+#ifdef WIFCONTINUED
+        UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
+#endif
         if (WIFSTOPPED(child_rc)) {
           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
@@ -603,8 +577,7 @@ namespace Batch {
           // Le fils est simplement stoppe
           // @@@ --------> SECTION CRITIQUE <-------- @@@
           pthread_mutex_lock(&_bm._threads_mutex);
-          _bm._threads[id].status       = STOPPED;
-          _bm._threads[id].param[STATE] = "Stopped";
+          _bm._threads[_id].param[STATE] = Batch::PAUSED;
           pthread_mutex_unlock(&_bm._threads_mutex);
           // @@@ --------> SECTION CRITIQUE <-------- @@@
           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
@@ -612,12 +585,6 @@ namespace Batch {
         }
         else {
           // Le fils est termine, on sort de la boucle et du if englobant
-          // @@@ --------> SECTION CRITIQUE <-------- @@@
-          pthread_mutex_lock(&_bm._threads_mutex);
-          _bm._threads[id].status       = DONE;
-          _bm._threads[id].param[STATE] = "Done";
-          pthread_mutex_unlock(&_bm._threads_mutex);
-          // @@@ --------> SECTION CRITIQUE <-------- @@@
           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
           break;
         }
@@ -626,8 +593,7 @@ namespace Batch {
         // Le fils a disparu ...
         // @@@ --------> SECTION CRITIQUE <-------- @@@
         pthread_mutex_lock(&_bm._threads_mutex);
-        _bm._threads[id].status       = DEAD;
-        _bm._threads[id].param[STATE] = "Dead";
+        _bm._threads[_id].hasFailed = true;
         pthread_mutex_unlock(&_bm._threads_mutex);
         // @@@ --------> SECTION CRITIQUE <-------- @@@
         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
@@ -642,32 +608,32 @@ namespace Batch {
 
       // On regarde si le fils n'a pas depasse son temps (wallclock time)
       time_t child_currenttime = time(NULL);
-      time_t child_elapsedtime = child_currenttime - child_starttime;
+      long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
       if (param.find(MAXWALLTIME) != param.end()) {
-        int maxwalltime = param[MAXWALLTIME];
+        long maxwalltime = param[MAXWALLTIME];
         //       cout << "child_starttime          = " << child_starttime        << endl
         //            << "child_currenttime        = " << child_currenttime      << endl
         //            << "child_elapsedtime        = " << child_elapsedtime      << endl
         //            << "maxwalltime              = " << maxwalltime            << endl
         //            << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
-        if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
-          UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
+        if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
+          UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
           // On introduit une commande dans la queue du thread
           // @@@ --------> SECTION CRITIQUE <-------- @@@
           pthread_mutex_lock(&_bm._threads_mutex);
-          if (_bm._threads.find(id) != _bm._threads.end())
-            _bm._threads[id].command_queue.push(KILL);
+          if (_bm._threads.find(_id) != _bm._threads.end())
+            _bm._threads[_id].command_queue.push(KILL);
           pthread_mutex_unlock(&_bm._threads_mutex);
           // @@@ --------> SECTION CRITIQUE <-------- @@@
 
 
-        } else if (child_elapsedtime > maxwalltime ) {
-          UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
+        } else if (child_elapsedtime_minutes > maxwalltime ) {
+          UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
           // On introduit une commande dans la queue du thread
           // @@@ --------> SECTION CRITIQUE <-------- @@@
           pthread_mutex_lock(&_bm._threads_mutex);
-          if (_bm._threads.find(id) != _bm._threads.end())
-            _bm._threads[id].command_queue.push(TERM);
+          if (_bm._threads.find(_id) != _bm._threads.end())
+            _bm._threads[_id].command_queue.push(TERM);
           pthread_mutex_unlock(&_bm._threads_mutex);
           // @@@ --------> SECTION CRITIQUE <-------- @@@
         }
@@ -678,10 +644,10 @@ namespace Batch {
       // On regarde s'il y a quelque chose a faire dans la queue de commande
       // @@@ --------> SECTION CRITIQUE <-------- @@@
       pthread_mutex_lock(&_bm._threads_mutex);
-      if (_bm._threads.find(id) != _bm._threads.end()) {
-        while (_bm._threads[id].command_queue.size() > 0) {
-          Commande cmd = _bm._threads[id].command_queue.front();
-          _bm._threads[id].command_queue.pop();
+      if (_bm._threads.find(_id) != _bm._threads.end()) {
+        while (_bm._threads[_id].command_queue.size() > 0) {
+          Commande cmd = _bm._threads[_id].command_queue.front();
+          _bm._threads[_id].command_queue.pop();
 
           switch (cmd) {
     case NOP:
@@ -740,6 +706,10 @@ namespace Batch {
     Parametre param = _job.getParametre();
     Parametre::iterator it;
 
+      //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+      //int result = execv("/usr/bin/ssh", parmList);
+      //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+      //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
     try {
 
       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
@@ -761,6 +731,7 @@ namespace Batch {
       argv[command.size()] = NULL;
 
       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
+      UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
 
       // Create the environment for the new process. Note (RB): Here we change the environment for
       // the process launched in local. It would seem more logical to set the environment for the
@@ -784,6 +755,10 @@ namespace Batch {
         envp[i] = NULL;
       }
 
+      //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+      //int result = execv("/usr/bin/ssh", parmList);
+      //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+      //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
 
 
 
@@ -807,12 +782,16 @@ namespace Batch {
       if (param.find(MAXRAMSIZE) != param.end()) {
         int maxramsize = param[MAXRAMSIZE];
         struct rlimit limit;
-        limit.rlim_cur = maxramsize * 1024;
-        limit.rlim_max = int(maxramsize * 1.1) * 1024;
+        limit.rlim_cur = maxramsize * 1024 * 1024;
+        limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
         setrlimit(RLIMIT_AS, &limit);
       }
 
 
+      //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+      //int result = execv("/usr/bin/ssh", parmList);
+      //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+      //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
 
       // On cree une session pour le fils de facon a ce qu'il ne soit pas
       // detruit lorsque le shell se termine (le shell ouvre une session et
@@ -828,7 +807,7 @@ namespace Batch {
 
       // On execute la commande du fils
       execve(argv[0], argv, envp);
-
+      UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
       // No need to deallocate since nothing happens after a successful exec
 
       // Normalement on ne devrait jamais arriver ici
@@ -942,4 +921,15 @@ namespace Batch {
     delete p_ta;
   }
 
+  void BatchManager_Local::setFailedOnCancel(void * arg)
+  {
+    ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+    pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
+    p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
+    pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
+
+    // Unlock the master thread. From here, the batch manager instance should not be used.
+    pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
+  }
+
 }