Salome HOME
Rename files:
[tools/libbatch.git] / src / Local / BatchManager_Local.cxx
1 //  Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include "Constants.hxx"
55 #include "IOMutex.hxx"
56 #include "BatchManager_Local.hxx"
57 #include "RunTimeException.hxx"
58
59 using namespace std;
60
61 namespace Batch {
62
63
64   // Constructeur
65   BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
66                                          const char * username,
67                                          CommunicationProtocolType protocolType, const char * mpiImpl)
68     : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
69       _idCounter(0)
70   {
71     pthread_mutex_init(&_threads_mutex, NULL);
72     pthread_cond_init(&_threadSyncCondition, NULL);
73   }
74
75   // Destructeur
76   BatchManager_Local::~BatchManager_Local()
77   {
78     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
79       pthread_mutex_lock(&_threads_mutex);
80       string state = iter->second.param[STATE];
81       if (state != FINISHED && state != FAILED) {
82         UNDER_LOCK( cout << "Warning: Job " << iter->first <<
83                             " is not finished, it will now be canceled." << endl );
84         pthread_cancel(iter->second.thread_id);
85         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
86       }
87       pthread_mutex_unlock(&_threads_mutex);
88     }
89     pthread_mutex_destroy(&_threads_mutex);
90     pthread_cond_destroy(&_threadSyncCondition);
91   }
92
93   // Methode pour le controle des jobs : soumet un job au gestionnaire
94   const JobId BatchManager_Local::submitJob(const Job & job)
95   {
96     // export input files in the working directory of the execution host
97     exportInputFiles(job);
98
99     Job_Local jobLocal = job;
100     Id id = _idCounter++;
101     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
102
103     // Les attributs du thread a sa creation
104     pthread_attr_t thread_attr;
105     pthread_attr_init(&thread_attr);
106     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
107
108     // Creation du thread qui va executer la commande systeme qu'on lui passe
109     pthread_t thread_id;
110     pthread_mutex_lock(&_threads_mutex);
111     int rc = pthread_create(&thread_id,
112       &thread_attr,
113       &ThreadAdapter::run,
114       static_cast<void *>(p_ta));
115
116     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
117     pthread_attr_destroy(&thread_attr);
118
119     if (rc != 0) {
120       pthread_mutex_unlock(&_threads_mutex);
121       throw RunTimeException("Can't create new thread in BatchManager_Local");
122     }
123
124     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
125     pthread_mutex_unlock(&_threads_mutex);
126
127     ostringstream id_sst;
128     id_sst << id;
129     return JobId(this, id_sst.str());
130   }
131
132   // Methode pour le controle des jobs : retire un job du gestionnaire
133   void BatchManager_Local::deleteJob(const JobId & jobid)
134   {
135     Id id;
136
137     istringstream iss(jobid.getReference());
138     iss >> id;
139
140     // @@@ --------> SECTION CRITIQUE <-------- @@@
141     pthread_mutex_lock(&_threads_mutex);
142     bool idFound = (_threads.find(id) != _threads.end());
143     if (idFound) {
144       string state = _threads[id].param[STATE];
145       if (state != FINISHED && state != FAILED) {
146         pthread_cancel(_threads[id].thread_id);
147         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
148       } else {
149         cout << "Cannot delete job " << jobid.getReference() <<
150                 ". Job is already finished." << endl;
151       }
152     }
153     pthread_mutex_unlock(&_threads_mutex);
154     // @@@ --------> SECTION CRITIQUE <-------- @@@
155
156     if (!idFound)
157       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
158   }
159
160   // Methode pour le controle des jobs : suspend un job en file d'attente
161   void BatchManager_Local::holdJob(const JobId & jobid)
162   {
163     Id id;
164     istringstream iss(jobid.getReference());
165     iss >> id;
166
167     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
168
169     // On introduit une commande dans la queue du thread
170     // @@@ --------> SECTION CRITIQUE <-------- @@@
171     pthread_mutex_lock(&_threads_mutex);
172     if (_threads.find(id) != _threads.end())
173       _threads[id].command_queue.push(HOLD);
174     pthread_mutex_unlock(&_threads_mutex);
175     // @@@ --------> SECTION CRITIQUE <-------- @@@
176   }
177
178   // Methode pour le controle des jobs : relache un job suspendu
179   void BatchManager_Local::releaseJob(const JobId & jobid)
180   {
181     Id id;
182     istringstream iss(jobid.getReference());
183     iss >> id;
184
185     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
186
187     // On introduit une commande dans la queue du thread
188     // @@@ --------> SECTION CRITIQUE <-------- @@@
189     pthread_mutex_lock(&_threads_mutex);
190     if (_threads.find(id) != _threads.end())
191       _threads[id].command_queue.push(RELEASE);
192     pthread_mutex_unlock(&_threads_mutex);
193     // @@@ --------> SECTION CRITIQUE <-------- @@@
194   }
195
196
197   // Methode pour le controle des jobs : modifie un job en file d'attente
198   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
199   {
200   }
201
202   // Methode pour le controle des jobs : modifie un job en file d'attente
203   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
204   {
205     alterJob(jobid, param, Environnement());
206   }
207
208   // Methode pour le controle des jobs : modifie un job en file d'attente
209   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
210   {
211     alterJob(jobid, Parametre(), env);
212   }
213
214
215
216   // Methode pour le controle des jobs : renvoie l'etat du job
217   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
218   {
219     Id id;
220     istringstream iss(jobid.getReference());
221     iss >> id;
222
223     Parametre param;
224     Environnement env;
225
226     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
227     // @@@ --------> SECTION CRITIQUE <-------- @@@
228     pthread_mutex_lock(&_threads_mutex);
229     std::map<Id, Child >::iterator pos = _threads.find(id);
230     bool found = (pos != _threads.end());
231     if (found) {
232       param = pos->second.param;
233       env   = pos->second.env;
234     }
235     pthread_mutex_unlock(&_threads_mutex);
236     // @@@ --------> SECTION CRITIQUE <-------- @@@
237     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
238
239     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
240
241     JobInfo_Local ji(param, env);
242     return ji;
243   }
244
245
246   // Ce manager ne peut pas reprendre un job
247   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
248   // au JobId
249   const Batch::JobId
250   BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
251   {
252     return JobId(this, "undefined");
253   }
254
255   // Methode pour le controle des jobs : teste si un job est present en machine
256   bool BatchManager_Local::isRunning(const JobId & jobid)
257   {
258     Id id;
259     istringstream iss(jobid.getReference());
260     iss >> id;
261
262     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
263     // @@@ --------> SECTION CRITIQUE <-------- @@@
264     pthread_mutex_lock(&_threads_mutex);
265     bool running = (_threads[id].param[STATE].str() == RUNNING);
266     pthread_mutex_unlock(&_threads_mutex);
267     // @@@ --------> SECTION CRITIQUE <-------- @@@
268     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
269
270     return running;
271   }
272
273
274   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
275   {
276     ostringstream exec_sub_cmd;
277
278 #ifdef WIN32
279     char drive[_MAX_DRIVE];
280     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
281     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
282 #endif
283
284     string fileToExecute = param[EXECUTABLE].str();
285     string::size_type p1 = fileToExecute.find_last_of("/");
286     string fileNameToExecute = fileToExecute.substr(p1+1);
287
288     exec_sub_cmd << "cd " << param[WORKDIR] << " && ./" << fileNameToExecute;
289
290     if (param.find(ARGUMENTS) != param.end()) {
291       Versatile V = param[ARGUMENTS];
292       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
293         StringType argt = * static_cast<StringType *>(*it);
294         string     arg  = argt;
295         exec_sub_cmd << " " << arg;
296       }
297     }
298
299     if (param.find(INFILE) != param.end()) {
300       Versatile V = param[INFILE];
301       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
302         Couple cpl = * static_cast<CoupleType*>(*it);
303         string remote = cpl.getRemote();
304         if (remote == "stdin")
305         exec_sub_cmd << " <stdin";
306       }
307     }
308
309     if (param.find(OUTFILE) != param.end()) {
310       Versatile V = param[OUTFILE];
311       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
312         Couple cpl = * static_cast<CoupleType*>(*it);
313         string remote = cpl.getRemote();
314         if (remote == "stdout") exec_sub_cmd << " 1>stdout";
315         if (remote == "stderr") exec_sub_cmd << " 2>stderr";
316       }
317     }
318
319     string user;
320     Parametre::const_iterator it = param.find(USER);
321     if (it != param.end()) {
322       user = string(it->second);
323     }
324
325     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), _hostname, user);
326   }
327
328
329
330   // Constructeur de la classe ThreadAdapter
331   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
332   _bm(bm), _job(job), _id(id)
333   {
334     // Nothing to do
335   }
336
337
338
339   // Methode d'execution du thread
340   void * BatchManager_Local::ThreadAdapter::run(void * arg)
341   {
342     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
343
344 #ifndef WIN32
345     // On bloque tous les signaux pour ce thread
346     sigset_t setmask;
347     sigfillset(&setmask);
348     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
349 #endif
350
351     // On autorise la terminaison differee du thread
352     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
353     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
354     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
355
356     // On enregistre la fonction de suppression du fils en cas d'arret du thread
357     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
358     // sera prise en compte par pthread_testcancel()
359     Process child;
360     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
361     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
362     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
363
364     // On forke/exec un nouveau process pour pouvoir controler le fils
365     // (plus finement qu'avec un appel system)
366     // int rc = system(commande.c_str());
367     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
368     //execv("/usr/bin/ssh", parmList);
369 #ifdef WIN32
370     child = p_ta->launchWin32ChildProcess();
371     p_ta->pere(child);
372 #else
373     child = fork();
374     if (child < 0) { // erreur
375       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
376
377     } else if (child > 0) { // pere
378       p_ta->pere(child);
379
380     } else { // fils
381       p_ta->fils();
382     }
383 #endif
384
385     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
386
387     // Set the job state to FINISHED or FAILED
388     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
389
390     // On retire la fonction de nettoyage de la memoire
391     pthread_cleanup_pop(0);
392
393     // On retire la fonction de suppression du fils
394     pthread_cleanup_pop(0);
395
396     // remove setFailedOnCancel function from cancel stack
397     pthread_cleanup_pop(0);
398
399     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
400
401     // On invoque la fonction de nettoyage de la memoire
402     delete_on_exit(arg);
403
404     UNDER_LOCK( cout << "Father is leaving" << endl );
405     pthread_exit(NULL);
406     return NULL;
407   }
408
409
410
411
412   void BatchManager_Local::ThreadAdapter::pere(Process child)
413   {
414     time_t child_starttime = time(NULL);
415
416     // On enregistre le fils dans la table des threads
417     pthread_t thread_id = pthread_self();
418
419     Parametre param   = _job.getParametre();
420     Environnement env = _job.getEnvironnement();
421
422     ostringstream id_sst;
423     id_sst << _id;
424     param[ID]         = id_sst.str();
425     param[STATE]      = Batch::RUNNING;
426 #ifndef WIN32
427     param[PID]        = child;
428 #endif
429
430     _bm._threads[_id].thread_id = thread_id;
431 #ifndef WIN32
432     _bm._threads[_id].pid       = child;
433 #endif
434     _bm._threads[_id].hasFailed = false;
435     _bm._threads[_id].param     = param;
436     _bm._threads[_id].env       = env;
437     _bm._threads[_id].command_queue.push(NOP);
438
439     // Unlock the master thread. From here, all shared variables must be protected
440     // from concurrent access
441     pthread_cond_signal(&_bm._threadSyncCondition);
442
443
444     // on boucle en attendant que le fils ait termine
445     while (1) {
446 #ifdef WIN32
447       DWORD exitCode;
448       GetExitCodeProcess(child, &exitCode);
449       if (exitCode != STILL_ACTIVE) {
450         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
451         break;
452       }
453 #else
454       int child_rc = 0;
455       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
456       if (child_wait_rc > 0) {
457          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
458          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
459          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
460          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
461          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
462          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
463          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
464 #ifdef WIFCONTINUED
465          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
466 #endif
467         if (WIFSTOPPED(child_rc)) {
468           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
469           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
470           // desactive car s'il est possible de detecter l'arret d'un process, il est
471           // plus difficile de detecter sa reprise.
472
473           // Le fils est simplement stoppe
474           // @@@ --------> SECTION CRITIQUE <-------- @@@
475           pthread_mutex_lock(&_bm._threads_mutex);
476           _bm._threads[_id].param[STATE] = Batch::PAUSED;
477           pthread_mutex_unlock(&_bm._threads_mutex);
478           // @@@ --------> SECTION CRITIQUE <-------- @@@
479           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
480
481         }
482         else {
483           // Le fils est termine, on sort de la boucle et du if englobant
484           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
485           break;
486         }
487       }
488       else if (child_wait_rc == -1) {
489         // Le fils a disparu ...
490         // @@@ --------> SECTION CRITIQUE <-------- @@@
491         pthread_mutex_lock(&_bm._threads_mutex);
492         _bm._threads[_id].hasFailed = true;
493         pthread_mutex_unlock(&_bm._threads_mutex);
494         // @@@ --------> SECTION CRITIQUE <-------- @@@
495         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
496         break;
497       }
498 #endif
499
500       // On teste si le thread doit etre detruit
501       pthread_testcancel();
502
503
504
505       // On regarde si le fils n'a pas depasse son temps (wallclock time)
506       time_t child_currenttime = time(NULL);
507       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
508       if (param.find(MAXWALLTIME) != param.end()) {
509         long maxwalltime = param[MAXWALLTIME];
510         //        cout << "child_starttime          = " << child_starttime        << endl
511         //             << "child_currenttime        = " << child_currenttime      << endl
512         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
513         //             << "maxwalltime              = " << maxwalltime            << endl
514         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
515         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
516           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
517           // On introduit une commande dans la queue du thread
518           // @@@ --------> SECTION CRITIQUE <-------- @@@
519           pthread_mutex_lock(&_bm._threads_mutex);
520           if (_bm._threads.find(_id) != _bm._threads.end())
521             _bm._threads[_id].command_queue.push(KILL);
522           pthread_mutex_unlock(&_bm._threads_mutex);
523           // @@@ --------> SECTION CRITIQUE <-------- @@@
524
525
526         } else if (child_elapsedtime_minutes > maxwalltime ) {
527           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
528           // On introduit une commande dans la queue du thread
529           // @@@ --------> SECTION CRITIQUE <-------- @@@
530           pthread_mutex_lock(&_bm._threads_mutex);
531           if (_bm._threads.find(_id) != _bm._threads.end())
532             _bm._threads[_id].command_queue.push(TERM);
533           pthread_mutex_unlock(&_bm._threads_mutex);
534           // @@@ --------> SECTION CRITIQUE <-------- @@@
535         }
536       }
537
538
539
540       // On regarde s'il y a quelque chose a faire dans la queue de commande
541       // @@@ --------> SECTION CRITIQUE <-------- @@@
542       pthread_mutex_lock(&_bm._threads_mutex);
543       if (_bm._threads.find(_id) != _bm._threads.end()) {
544         while (_bm._threads[_id].command_queue.size() > 0) {
545           Commande cmd = _bm._threads[_id].command_queue.front();
546           _bm._threads[_id].command_queue.pop();
547
548           switch (cmd) {
549     case NOP:
550       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
551       break;
552 #ifndef WIN32
553     case HOLD:
554       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
555       kill(child, SIGSTOP);
556       break;
557
558     case RELEASE:
559       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
560       kill(child, SIGCONT);
561       break;
562
563     case TERM:
564       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
565       kill(child, SIGTERM);
566       break;
567
568     case KILL:
569       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
570       kill(child, SIGKILL);
571       break;
572 #endif
573     case ALTER:
574       break;
575
576     default:
577       break;
578           }
579         }
580
581       }
582       pthread_mutex_unlock(&_bm._threads_mutex);
583       // @@@ --------> SECTION CRITIQUE <-------- @@@
584
585       // On fait une petite pause pour ne pas surcharger inutilement le processeur
586 #ifdef WIN32
587       Sleep(1000);
588 #else
589       sleep(1);
590 #endif
591
592     }
593
594   }
595
596
597
598 #ifndef WIN32
599
600   void BatchManager_Local::ThreadAdapter::fils()
601   {
602     Parametre param = _job.getParametre();
603     Parametre::iterator it;
604
605       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
606       //int result = execv("/usr/bin/ssh", parmList);
607       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
608       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
609     try {
610
611       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
612       vector<string> command;
613       if (param.find(EXECUTABLE) != param.end()) {
614         command = _bm.exec_command(param);
615       } else exit(1);
616
617       // Build the argument array argv from the command
618       char ** argv = new char * [command.size() + 1];
619       string comstr;
620       for (string::size_type i=0 ; i<command.size() ; i++) {
621         argv[i] = new char[command[i].size() + 1];
622         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
623         if (i>0) comstr += " # ";
624         comstr += command[i];
625       }
626
627       argv[command.size()] = NULL;
628
629       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
630       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
631
632       // Create the environment for the new process. Note (RB): Here we change the environment for
633       // the process launched in local. It would seem more logical to set the environment for the
634       // remote process.
635       Environnement env = _job.getEnvironnement();
636
637       char ** envp = NULL;
638       if(env.size() > 0) {
639         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
640         int i = 0;
641         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
642           const string  & key   = (*it).first;
643           const string  & value = (*it).second;
644           ostringstream oss;
645           oss << key << "=" << value;
646           envp[i]         = new char [oss.str().size() + 1];
647           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
648         }
649
650         // assert (i == env.size())
651         envp[i] = NULL;
652       }
653
654       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
655       //int result = execv("/usr/bin/ssh", parmList);
656       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
657       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
658
659
660
661       // On positionne les limites systeme imposees au fils
662       // This part is deactivated because those limits should be set on the job process, not on
663       // the ssh process. If it is done properly one day, beware of the types used (int is not enough)
664       /*
665       if (param.find(MAXCPUTIME) != param.end()) {
666         int maxcputime = param[MAXCPUTIME];
667         struct rlimit limit;
668         limit.rlim_cur = maxcputime;
669         limit.rlim_max = int(maxcputime * 1.1);
670         setrlimit(RLIMIT_CPU, &limit);
671       }
672
673       if (param.find(MAXDISKSIZE) != param.end()) {
674         int maxdisksize = param[MAXDISKSIZE];
675         struct rlimit limit;
676         limit.rlim_cur = maxdisksize * 1024;
677         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
678         setrlimit(RLIMIT_FSIZE, &limit);
679       }
680
681       if (param.find(MAXRAMSIZE) != param.end()) {
682         int maxramsize = param[MAXRAMSIZE];
683         struct rlimit limit;
684         limit.rlim_cur = maxramsize * 1024 * 1024;
685         limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
686         setrlimit(RLIMIT_AS, &limit);
687       }
688       */
689
690
691       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
692       //int result = execv("/usr/bin/ssh", parmList);
693       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
694       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
695
696       // On cree une session pour le fils de facon a ce qu'il ne soit pas
697       // detruit lorsque le shell se termine (le shell ouvre une session et
698       // tue tous les process appartenant a la session en quittant)
699       setsid();
700
701
702       // On ferme les descripteurs de fichiers standards
703       //close(STDIN_FILENO);
704       //close(STDOUT_FILENO);
705       //close(STDERR_FILENO);
706
707
708       // On execute la commande du fils
709       execve(argv[0], argv, envp);
710       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
711       // No need to deallocate since nothing happens after a successful exec
712
713       // Normalement on ne devrait jamais arriver ici
714       ofstream file_err("error.log");
715       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
716
717     } catch (GenericException & e) {
718
719       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
720     }
721
722     exit(99);
723   }
724
725 #else
726
727   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
728   {
729     Parametre param = _job.getParametre();
730     Parametre::iterator it;
731     PROCESS_INFORMATION pi;
732
733     try {
734
735       // EXECUTABLE is MANDATORY, if missing, we throw an exception
736       vector<string> exec_command;
737       if (param.find(EXECUTABLE) != param.end()) {
738         exec_command = _bm.exec_command(param);
739       } else {
740         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
741       }
742
743       // Build the command string from the command argument vector
744       string comstr;
745       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
746         if (i>0) comstr += " ";
747         comstr += exec_command[i];
748       }
749
750       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
751
752       // Create the environment for the new process. Note (RB): Here we change the environment for
753       // the process launched in local. It would seem more logical to set the environment for the
754       // remote process.
755       // Note that if no environment is specified, we reuse the current environment.
756       Environnement env = _job.getEnvironnement();
757       char * chNewEnv = NULL;
758
759       if(env.size() > 0) {
760         chNewEnv = new char[4096];
761         LPTSTR lpszCurrentVariable = chNewEnv;
762         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
763           const string  & key   = (*it).first;
764           const string  & value = (*it).second;
765           string envvar = key + "=" + value;
766           envvar.copy(lpszCurrentVariable, envvar.size());
767           lpszCurrentVariable[envvar.size()] = '\0';
768           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
769         }
770         // Terminate the block with a NULL byte.
771         *lpszCurrentVariable = '\0';
772       }
773
774
775       STARTUPINFO si;
776       ZeroMemory( &si, sizeof(si) );
777       si.cb = sizeof(si);
778       ZeroMemory( &pi, sizeof(pi) );
779
780       // Copy the command to a non-const buffer
781       char * buffer = strdup(comstr.c_str());
782
783       // launch the new process
784       bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
785                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
786
787       if (buffer) free(buffer);
788       if (!res) throw RunTimeException("Error while creating new process");
789
790       CloseHandle(pi.hThread);
791
792     } catch (GenericException & e) {
793
794       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
795     }
796
797     return pi.hProcess;
798   }
799
800 #endif
801
802
803   void BatchManager_Local::kill_child_on_exit(void * p_pid)
804   {
805 #ifndef WIN32
806     //TODO: porting of following functionality
807     pid_t child = * static_cast<pid_t *>(p_pid);
808
809     // On tue le fils
810     kill(child, SIGTERM);
811
812     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
813     // mais cette option n'est pas implementee pour le moment, car il est
814     // preferable de laisser le process fils se terminer normalement et seul.
815 #endif
816   }
817
818   void BatchManager_Local::delete_on_exit(void * arg)
819   {
820     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
821     delete p_ta;
822   }
823
824   void BatchManager_Local::setFailedOnCancel(void * arg)
825   {
826     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
827     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
828     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
829     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
830
831     // Unlock the master thread. From here, the batch manager instance should not be used.
832     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
833   }
834
835 }