Salome HOME
f09b2cdfe23e5e20b0490a43a0412a62c825b92d
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include "Batch_Constants.hxx"
55 #include "Batch_IOMutex.hxx"
56 #include "Batch_BatchManager_Local.hxx"
57 #include "Batch_RunTimeException.hxx"
58
59 using namespace std;
60
61 namespace Batch {
62
63
64   // Constructeur
65   BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
66                                          const char * username,
67                                          CommunicationProtocolType protocolType, const char * mpiImpl,
68                                          int nb_proc_per_node)
69     : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
70       _idCounter(0)
71   {
72     pthread_mutex_init(&_threads_mutex, NULL);
73     pthread_cond_init(&_threadSyncCondition, NULL);
74   }
75
76   // Destructeur
77   BatchManager_Local::~BatchManager_Local()
78   {
79     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
80       pthread_mutex_lock(&_threads_mutex);
81       string state = iter->second.param[STATE];
82       if (state != FINISHED && state != FAILED) {
83         UNDER_LOCK( cout << "Warning: Job " << iter->first <<
84                             " is not finished, it will now be canceled." << endl );
85         pthread_cancel(iter->second.thread_id);
86         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
87       }
88       pthread_mutex_unlock(&_threads_mutex);
89     }
90     pthread_mutex_destroy(&_threads_mutex);
91     pthread_cond_destroy(&_threadSyncCondition);
92   }
93
94   // Methode pour le controle des jobs : soumet un job au gestionnaire
95   const JobId BatchManager_Local::submitJob(const Job & job)
96   {
97     // export input files in the working directory of the execution host
98     exportInputFiles(job);
99
100     Job_Local jobLocal = job;
101     Id id = _idCounter++;
102     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
103
104     // Les attributs du thread a sa creation
105     pthread_attr_t thread_attr;
106     pthread_attr_init(&thread_attr);
107     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
108
109     // Creation du thread qui va executer la commande systeme qu'on lui passe
110     pthread_t thread_id;
111     pthread_mutex_lock(&_threads_mutex);
112     int rc = pthread_create(&thread_id,
113       &thread_attr,
114       &ThreadAdapter::run,
115       static_cast<void *>(p_ta));
116
117     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
118     pthread_attr_destroy(&thread_attr);
119
120     if (rc != 0) {
121       pthread_mutex_unlock(&_threads_mutex);
122       throw RunTimeException("Can't create new thread in BatchManager_Local");
123     }
124
125     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
126     pthread_mutex_unlock(&_threads_mutex);
127
128     ostringstream id_sst;
129     id_sst << id;
130     return JobId(this, id_sst.str());
131   }
132
133   // Methode pour le controle des jobs : retire un job du gestionnaire
134   void BatchManager_Local::deleteJob(const JobId & jobid)
135   {
136     Id id;
137
138     istringstream iss(jobid.getReference());
139     iss >> id;
140
141     // @@@ --------> SECTION CRITIQUE <-------- @@@
142     pthread_mutex_lock(&_threads_mutex);
143     bool idFound = (_threads.find(id) != _threads.end());
144     if (idFound) {
145       string state = _threads[id].param[STATE];
146       if (state != FINISHED && state != FAILED) {
147         pthread_cancel(_threads[id].thread_id);
148         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
149       } else {
150         cout << "Cannot delete job " << jobid.getReference() <<
151                 ". Job is already finished." << endl;
152       }
153     }
154     pthread_mutex_unlock(&_threads_mutex);
155     // @@@ --------> SECTION CRITIQUE <-------- @@@
156
157     if (!idFound)
158       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
159   }
160
161   // Methode pour le controle des jobs : suspend un job en file d'attente
162   void BatchManager_Local::holdJob(const JobId & jobid)
163   {
164     Id id;
165     istringstream iss(jobid.getReference());
166     iss >> id;
167
168     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
169
170     // On introduit une commande dans la queue du thread
171     // @@@ --------> SECTION CRITIQUE <-------- @@@
172     pthread_mutex_lock(&_threads_mutex);
173     if (_threads.find(id) != _threads.end())
174       _threads[id].command_queue.push(HOLD);
175     pthread_mutex_unlock(&_threads_mutex);
176     // @@@ --------> SECTION CRITIQUE <-------- @@@
177   }
178
179   // Methode pour le controle des jobs : relache un job suspendu
180   void BatchManager_Local::releaseJob(const JobId & jobid)
181   {
182     Id id;
183     istringstream iss(jobid.getReference());
184     iss >> id;
185
186     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
187
188     // On introduit une commande dans la queue du thread
189     // @@@ --------> SECTION CRITIQUE <-------- @@@
190     pthread_mutex_lock(&_threads_mutex);
191     if (_threads.find(id) != _threads.end())
192       _threads[id].command_queue.push(RELEASE);
193     pthread_mutex_unlock(&_threads_mutex);
194     // @@@ --------> SECTION CRITIQUE <-------- @@@
195   }
196
197
198   // Methode pour le controle des jobs : modifie un job en file d'attente
199   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
200   {
201   }
202
203   // Methode pour le controle des jobs : modifie un job en file d'attente
204   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
205   {
206     alterJob(jobid, param, Environnement());
207   }
208
209   // Methode pour le controle des jobs : modifie un job en file d'attente
210   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
211   {
212     alterJob(jobid, Parametre(), env);
213   }
214
215
216
217   // Methode pour le controle des jobs : renvoie l'etat du job
218   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
219   {
220     Id id;
221     istringstream iss(jobid.getReference());
222     iss >> id;
223
224     Parametre param;
225     Environnement env;
226
227     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
228     // @@@ --------> SECTION CRITIQUE <-------- @@@
229     pthread_mutex_lock(&_threads_mutex);
230     std::map<Id, Child >::iterator pos = _threads.find(id);
231     bool found = (pos != _threads.end());
232     if (found) {
233       param = pos->second.param;
234       env   = pos->second.env;
235     }
236     pthread_mutex_unlock(&_threads_mutex);
237     // @@@ --------> SECTION CRITIQUE <-------- @@@
238     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
239
240     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
241
242     JobInfo_Local ji(param, env);
243     return ji;
244   }
245
246
247   // Ce manager ne peut pas reprendre un job
248   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
249   // au JobId
250   const Batch::JobId
251   BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
252   {
253     return JobId(this, "undefined");
254   }
255
256   // Methode pour le controle des jobs : teste si un job est present en machine
257   bool BatchManager_Local::isRunning(const JobId & jobid)
258   {
259     Id id;
260     istringstream iss(jobid.getReference());
261     iss >> id;
262
263     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
264     // @@@ --------> SECTION CRITIQUE <-------- @@@
265     pthread_mutex_lock(&_threads_mutex);
266     bool running = (_threads[id].param[STATE].str() == RUNNING);
267     pthread_mutex_unlock(&_threads_mutex);
268     // @@@ --------> SECTION CRITIQUE <-------- @@@
269     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
270
271     return running;
272   }
273
274
275   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
276   {
277     ostringstream exec_sub_cmd;
278
279 #ifdef WIN32
280     char drive[_MAX_DRIVE];
281     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
282     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
283 #endif
284
285     string fileToExecute = param[EXECUTABLE].str();
286     string::size_type p1 = fileToExecute.find_last_of("/");
287     string fileNameToExecute = fileToExecute.substr(p1+1);
288
289     exec_sub_cmd << "cd " << param[WORKDIR] << " && ./" << fileNameToExecute;
290
291     if (param.find(ARGUMENTS) != param.end()) {
292       Versatile V = param[ARGUMENTS];
293       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
294         StringType argt = * static_cast<StringType *>(*it);
295         string     arg  = argt;
296         exec_sub_cmd << " " << arg;
297       }
298     }
299
300     if (param.find(INFILE) != param.end()) {
301       Versatile V = param[INFILE];
302       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
303         Couple cpl = * static_cast<CoupleType*>(*it);
304         string remote = cpl.getRemote();
305         if (remote == "stdin")
306         exec_sub_cmd << " <stdin";
307       }
308     }
309
310     if (param.find(OUTFILE) != param.end()) {
311       Versatile V = param[OUTFILE];
312       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
313         Couple cpl = * static_cast<CoupleType*>(*it);
314         string remote = cpl.getRemote();
315         if (remote == "stdout") exec_sub_cmd << " 1>stdout";
316         if (remote == "stderr") exec_sub_cmd << " 2>stderr";
317       }
318     }
319
320     string user;
321     Parametre::const_iterator it = param.find(USER);
322     if (it != param.end()) {
323       user = string(it->second);
324     }
325
326     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), _hostname, user);
327   }
328
329
330
331   // Constructeur de la classe ThreadAdapter
332   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
333   _bm(bm), _job(job), _id(id)
334   {
335     // Nothing to do
336   }
337
338
339
340   // Methode d'execution du thread
341   void * BatchManager_Local::ThreadAdapter::run(void * arg)
342   {
343     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
344
345 #ifndef WIN32
346     // On bloque tous les signaux pour ce thread
347     sigset_t setmask;
348     sigfillset(&setmask);
349     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
350 #endif
351
352     // On autorise la terminaison differee du thread
353     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
354     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
355     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
356
357     // On enregistre la fonction de suppression du fils en cas d'arret du thread
358     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
359     // sera prise en compte par pthread_testcancel()
360     Process child;
361     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
362     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
363     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
364
365     // On forke/exec un nouveau process pour pouvoir controler le fils
366     // (plus finement qu'avec un appel system)
367     // int rc = system(commande.c_str());
368     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
369     //execv("/usr/bin/ssh", parmList);
370 #ifdef WIN32
371     child = p_ta->launchWin32ChildProcess();
372     p_ta->pere(child);
373 #else
374     child = fork();
375     if (child < 0) { // erreur
376       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
377
378     } else if (child > 0) { // pere
379       p_ta->pere(child);
380
381     } else { // fils
382       p_ta->fils();
383     }
384 #endif
385
386     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
387
388     // Set the job state to FINISHED or FAILED
389     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
390
391     // On retire la fonction de nettoyage de la memoire
392     pthread_cleanup_pop(0);
393
394     // On retire la fonction de suppression du fils
395     pthread_cleanup_pop(0);
396
397     // remove setFailedOnCancel function from cancel stack
398     pthread_cleanup_pop(0);
399
400     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
401
402     // On invoque la fonction de nettoyage de la memoire
403     delete_on_exit(arg);
404
405     UNDER_LOCK( cout << "Father is leaving" << endl );
406     pthread_exit(NULL);
407     return NULL;
408   }
409
410
411
412
413   void BatchManager_Local::ThreadAdapter::pere(Process child)
414   {
415     time_t child_starttime = time(NULL);
416
417     // On enregistre le fils dans la table des threads
418     pthread_t thread_id = pthread_self();
419
420     Parametre param   = _job.getParametre();
421     Environnement env = _job.getEnvironnement();
422
423     ostringstream id_sst;
424     id_sst << _id;
425     param[ID]         = id_sst.str();
426     param[STATE]      = Batch::RUNNING;
427 #ifndef WIN32
428     param[PID]        = child;
429 #endif
430
431     _bm._threads[_id].thread_id = thread_id;
432 #ifndef WIN32
433     _bm._threads[_id].pid       = child;
434 #endif
435     _bm._threads[_id].hasFailed = false;
436     _bm._threads[_id].param     = param;
437     _bm._threads[_id].env       = env;
438     _bm._threads[_id].command_queue.push(NOP);
439
440     // Unlock the master thread. From here, all shared variables must be protected
441     // from concurrent access
442     pthread_cond_signal(&_bm._threadSyncCondition);
443
444
445     // on boucle en attendant que le fils ait termine
446     while (1) {
447 #ifdef WIN32
448       DWORD exitCode;
449       GetExitCodeProcess(child, &exitCode);
450       if (exitCode != STILL_ACTIVE) {
451         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
452         break;
453       }
454 #else
455       int child_rc = 0;
456       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
457       if (child_wait_rc > 0) {
458          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
459          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
460          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
461          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
462          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
463          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
464          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
465 #ifdef WIFCONTINUED
466          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
467 #endif
468         if (WIFSTOPPED(child_rc)) {
469           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
470           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
471           // desactive car s'il est possible de detecter l'arret d'un process, il est
472           // plus difficile de detecter sa reprise.
473
474           // Le fils est simplement stoppe
475           // @@@ --------> SECTION CRITIQUE <-------- @@@
476           pthread_mutex_lock(&_bm._threads_mutex);
477           _bm._threads[_id].param[STATE] = Batch::PAUSED;
478           pthread_mutex_unlock(&_bm._threads_mutex);
479           // @@@ --------> SECTION CRITIQUE <-------- @@@
480           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
481
482         }
483         else {
484           // Le fils est termine, on sort de la boucle et du if englobant
485           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
486           break;
487         }
488       }
489       else if (child_wait_rc == -1) {
490         // Le fils a disparu ...
491         // @@@ --------> SECTION CRITIQUE <-------- @@@
492         pthread_mutex_lock(&_bm._threads_mutex);
493         _bm._threads[_id].hasFailed = true;
494         pthread_mutex_unlock(&_bm._threads_mutex);
495         // @@@ --------> SECTION CRITIQUE <-------- @@@
496         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
497         break;
498       }
499 #endif
500
501       // On teste si le thread doit etre detruit
502       pthread_testcancel();
503
504
505
506       // On regarde si le fils n'a pas depasse son temps (wallclock time)
507       time_t child_currenttime = time(NULL);
508       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
509       if (param.find(MAXWALLTIME) != param.end()) {
510         long maxwalltime = param[MAXWALLTIME];
511         //        cout << "child_starttime          = " << child_starttime        << endl
512         //             << "child_currenttime        = " << child_currenttime      << endl
513         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
514         //             << "maxwalltime              = " << maxwalltime            << endl
515         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
516         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
517           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
518           // On introduit une commande dans la queue du thread
519           // @@@ --------> SECTION CRITIQUE <-------- @@@
520           pthread_mutex_lock(&_bm._threads_mutex);
521           if (_bm._threads.find(_id) != _bm._threads.end())
522             _bm._threads[_id].command_queue.push(KILL);
523           pthread_mutex_unlock(&_bm._threads_mutex);
524           // @@@ --------> SECTION CRITIQUE <-------- @@@
525
526
527         } else if (child_elapsedtime_minutes > maxwalltime ) {
528           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
529           // On introduit une commande dans la queue du thread
530           // @@@ --------> SECTION CRITIQUE <-------- @@@
531           pthread_mutex_lock(&_bm._threads_mutex);
532           if (_bm._threads.find(_id) != _bm._threads.end())
533             _bm._threads[_id].command_queue.push(TERM);
534           pthread_mutex_unlock(&_bm._threads_mutex);
535           // @@@ --------> SECTION CRITIQUE <-------- @@@
536         }
537       }
538
539
540
541       // On regarde s'il y a quelque chose a faire dans la queue de commande
542       // @@@ --------> SECTION CRITIQUE <-------- @@@
543       pthread_mutex_lock(&_bm._threads_mutex);
544       if (_bm._threads.find(_id) != _bm._threads.end()) {
545         while (_bm._threads[_id].command_queue.size() > 0) {
546           Commande cmd = _bm._threads[_id].command_queue.front();
547           _bm._threads[_id].command_queue.pop();
548
549           switch (cmd) {
550     case NOP:
551       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
552       break;
553 #ifndef WIN32
554     case HOLD:
555       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
556       kill(child, SIGSTOP);
557       break;
558
559     case RELEASE:
560       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
561       kill(child, SIGCONT);
562       break;
563
564     case TERM:
565       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
566       kill(child, SIGTERM);
567       break;
568
569     case KILL:
570       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
571       kill(child, SIGKILL);
572       break;
573 #endif
574     case ALTER:
575       break;
576
577     default:
578       break;
579           }
580         }
581
582       }
583       pthread_mutex_unlock(&_bm._threads_mutex);
584       // @@@ --------> SECTION CRITIQUE <-------- @@@
585
586       // On fait une petite pause pour ne pas surcharger inutilement le processeur
587 #ifdef WIN32
588       Sleep(1000);
589 #else
590       sleep(1);
591 #endif
592
593     }
594
595   }
596
597
598
599 #ifndef WIN32
600
601   void BatchManager_Local::ThreadAdapter::fils()
602   {
603     Parametre param = _job.getParametre();
604     Parametre::iterator it;
605
606       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
607       //int result = execv("/usr/bin/ssh", parmList);
608       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
609       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
610     try {
611
612       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
613       vector<string> command;
614       if (param.find(EXECUTABLE) != param.end()) {
615         command = _bm.exec_command(param);
616       } else exit(1);
617
618       // Build the argument array argv from the command
619       char ** argv = new char * [command.size() + 1];
620       string comstr;
621       for (string::size_type i=0 ; i<command.size() ; i++) {
622         argv[i] = new char[command[i].size() + 1];
623         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
624         if (i>0) comstr += " # ";
625         comstr += command[i];
626       }
627
628       argv[command.size()] = NULL;
629
630       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
631       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
632
633       // Create the environment for the new process. Note (RB): Here we change the environment for
634       // the process launched in local. It would seem more logical to set the environment for the
635       // remote process.
636       Environnement env = _job.getEnvironnement();
637
638       char ** envp = NULL;
639       if(env.size() > 0) {
640         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
641         int i = 0;
642         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
643           const string  & key   = (*it).first;
644           const string  & value = (*it).second;
645           ostringstream oss;
646           oss << key << "=" << value;
647           envp[i]         = new char [oss.str().size() + 1];
648           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
649         }
650
651         // assert (i == env.size())
652         envp[i] = NULL;
653       }
654
655       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
656       //int result = execv("/usr/bin/ssh", parmList);
657       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
658       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
659
660
661
662       // On positionne les limites systeme imposees au fils
663       // This part is deactivated because those limits should be set on the job process, not on
664       // the ssh process. If it is done properly one day, beware of the types used (int is not enough)
665       /*
666       if (param.find(MAXCPUTIME) != param.end()) {
667         int maxcputime = param[MAXCPUTIME];
668         struct rlimit limit;
669         limit.rlim_cur = maxcputime;
670         limit.rlim_max = int(maxcputime * 1.1);
671         setrlimit(RLIMIT_CPU, &limit);
672       }
673
674       if (param.find(MAXDISKSIZE) != param.end()) {
675         int maxdisksize = param[MAXDISKSIZE];
676         struct rlimit limit;
677         limit.rlim_cur = maxdisksize * 1024;
678         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
679         setrlimit(RLIMIT_FSIZE, &limit);
680       }
681
682       if (param.find(MAXRAMSIZE) != param.end()) {
683         int maxramsize = param[MAXRAMSIZE];
684         struct rlimit limit;
685         limit.rlim_cur = maxramsize * 1024 * 1024;
686         limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
687         setrlimit(RLIMIT_AS, &limit);
688       }
689       */
690
691
692       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
693       //int result = execv("/usr/bin/ssh", parmList);
694       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
695       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
696
697       // On cree une session pour le fils de facon a ce qu'il ne soit pas
698       // detruit lorsque le shell se termine (le shell ouvre une session et
699       // tue tous les process appartenant a la session en quittant)
700       setsid();
701
702
703       // On ferme les descripteurs de fichiers standards
704       //close(STDIN_FILENO);
705       //close(STDOUT_FILENO);
706       //close(STDERR_FILENO);
707
708
709       // On execute la commande du fils
710       execve(argv[0], argv, envp);
711       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
712       // No need to deallocate since nothing happens after a successful exec
713
714       // Normalement on ne devrait jamais arriver ici
715       ofstream file_err("error.log");
716       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
717
718     } catch (GenericException & e) {
719
720       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
721     }
722
723     exit(99);
724   }
725
726 #else
727
728   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
729   {
730     Parametre param = _job.getParametre();
731     Parametre::iterator it;
732     PROCESS_INFORMATION pi;
733
734     try {
735
736       // EXECUTABLE is MANDATORY, if missing, we throw an exception
737       vector<string> exec_command;
738       if (param.find(EXECUTABLE) != param.end()) {
739         exec_command = _bm.exec_command(param);
740       } else {
741         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
742       }
743
744       // Build the command string from the command argument vector
745       string comstr;
746       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
747         if (i>0) comstr += " ";
748         comstr += exec_command[i];
749       }
750
751       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
752
753       // Create the environment for the new process. Note (RB): Here we change the environment for
754       // the process launched in local. It would seem more logical to set the environment for the
755       // remote process.
756       // Note that if no environment is specified, we reuse the current environment.
757       Environnement env = _job.getEnvironnement();
758       char * chNewEnv = NULL;
759
760       if(env.size() > 0) {
761         chNewEnv = new char[4096];
762         LPTSTR lpszCurrentVariable = chNewEnv;
763         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
764           const string  & key   = (*it).first;
765           const string  & value = (*it).second;
766           string envvar = key + "=" + value;
767           envvar.copy(lpszCurrentVariable, envvar.size());
768           lpszCurrentVariable[envvar.size()] = '\0';
769           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
770         }
771         // Terminate the block with a NULL byte.
772         *lpszCurrentVariable = '\0';
773       }
774
775
776       STARTUPINFO si;
777       ZeroMemory( &si, sizeof(si) );
778       si.cb = sizeof(si);
779       ZeroMemory( &pi, sizeof(pi) );
780
781       // Copy the command to a non-const buffer
782       char * buffer = strdup(comstr.c_str());
783
784       // launch the new process
785       bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
786                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
787
788       if (buffer) free(buffer);
789       if (!res) throw RunTimeException("Error while creating new process");
790
791       CloseHandle(pi.hThread);
792
793     } catch (GenericException & e) {
794
795       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
796     }
797
798     return pi.hProcess;
799   }
800
801 #endif
802
803
804   void BatchManager_Local::kill_child_on_exit(void * p_pid)
805   {
806 #ifndef WIN32
807     //TODO: porting of following functionality
808     pid_t child = * static_cast<pid_t *>(p_pid);
809
810     // On tue le fils
811     kill(child, SIGTERM);
812
813     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
814     // mais cette option n'est pas implementee pour le moment, car il est
815     // preferable de laisser le process fils se terminer normalement et seul.
816 #endif
817   }
818
819   void BatchManager_Local::delete_on_exit(void * arg)
820   {
821     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
822     delete p_ta;
823   }
824
825   void BatchManager_Local::setFailedOnCancel(void * arg)
826   {
827     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
828     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
829     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
830     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
831
832     // Unlock the master thread. From here, the batch manager instance should not be used.
833     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
834   }
835
836 }