Salome HOME
c7d3f60d0f857065feee0ac6a341f8d4bd9f45c2
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2011  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include "Batch_Constants.hxx"
55 #include "Batch_IOMutex.hxx"
56 #include "Batch_BatchManager_Local.hxx"
57 #include "Batch_RunTimeException.hxx"
58
59 using namespace std;
60
61 namespace Batch {
62
63
64   // Constructeur
65   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
66                                          CommunicationProtocolType protocolType)
67     : BatchManager(parent, host), _connect(0),
68       _protocol(CommunicationProtocol::getInstance(protocolType)),
69       _idCounter(0)
70   {
71     pthread_mutex_init(&_threads_mutex, NULL);
72     pthread_cond_init(&_threadSyncCondition, NULL);
73   }
74
75   // Destructeur
76   BatchManager_Local::~BatchManager_Local()
77   {
78     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
79       pthread_mutex_lock(&_threads_mutex);
80       string state = iter->second.param[STATE];
81       if (state != FINISHED && state != FAILED) {
82         UNDER_LOCK( cout << "Warning: Job " << iter->first <<
83                             " is not finished, it will now be canceled." << endl );
84         pthread_cancel(iter->second.thread_id);
85         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
86       }
87       pthread_mutex_unlock(&_threads_mutex);
88     }
89     pthread_mutex_destroy(&_threads_mutex);
90     pthread_cond_destroy(&_threadSyncCondition);
91   }
92
93   const CommunicationProtocol & BatchManager_Local::getProtocol() const
94   {
95     return _protocol;
96   }
97
98   // Methode pour le controle des jobs : soumet un job au gestionnaire
99   const JobId BatchManager_Local::submitJob(const Job & job)
100   {
101     Job_Local jobLocal = job;
102     Id id = _idCounter++;
103     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
104
105     // Les attributs du thread a sa creation
106     pthread_attr_t thread_attr;
107     pthread_attr_init(&thread_attr);
108     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
109
110     // Creation du thread qui va executer la commande systeme qu'on lui passe
111     pthread_t thread_id;
112     pthread_mutex_lock(&_threads_mutex);
113     int rc = pthread_create(&thread_id,
114       &thread_attr,
115       &ThreadAdapter::run,
116       static_cast<void *>(p_ta));
117
118     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
119     pthread_attr_destroy(&thread_attr);
120
121     if (rc != 0) {
122       pthread_mutex_unlock(&_threads_mutex);
123       throw RunTimeException("Can't create new thread in BatchManager_Local");
124     }
125
126     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
127     pthread_mutex_unlock(&_threads_mutex);
128
129     ostringstream id_sst;
130     id_sst << id;
131     return JobId(this, id_sst.str());
132   }
133
134   // Methode pour le controle des jobs : retire un job du gestionnaire
135   void BatchManager_Local::deleteJob(const JobId & jobid)
136   {
137     Id id;
138
139     istringstream iss(jobid.getReference());
140     iss >> id;
141
142     // @@@ --------> SECTION CRITIQUE <-------- @@@
143     pthread_mutex_lock(&_threads_mutex);
144     bool idFound = (_threads.find(id) != _threads.end());
145     if (idFound) {
146       string state = _threads[id].param[STATE];
147       if (state != FINISHED && state != FAILED) {
148         pthread_cancel(_threads[id].thread_id);
149         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
150       } else {
151         cout << "Cannot delete job " << jobid.getReference() <<
152                 ". Job is already finished." << endl;
153       }
154     }
155     pthread_mutex_unlock(&_threads_mutex);
156     // @@@ --------> SECTION CRITIQUE <-------- @@@
157
158     if (!idFound)
159       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
160   }
161
162   // Methode pour le controle des jobs : suspend un job en file d'attente
163   void BatchManager_Local::holdJob(const JobId & jobid)
164   {
165     Id id;
166     istringstream iss(jobid.getReference());
167     iss >> id;
168
169     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
170
171     // On introduit une commande dans la queue du thread
172     // @@@ --------> SECTION CRITIQUE <-------- @@@
173     pthread_mutex_lock(&_threads_mutex);
174     if (_threads.find(id) != _threads.end())
175       _threads[id].command_queue.push(HOLD);
176     pthread_mutex_unlock(&_threads_mutex);
177     // @@@ --------> SECTION CRITIQUE <-------- @@@
178   }
179
180   // Methode pour le controle des jobs : relache un job suspendu
181   void BatchManager_Local::releaseJob(const JobId & jobid)
182   {
183     Id id;
184     istringstream iss(jobid.getReference());
185     iss >> id;
186
187     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
188
189     // On introduit une commande dans la queue du thread
190     // @@@ --------> SECTION CRITIQUE <-------- @@@
191     pthread_mutex_lock(&_threads_mutex);
192     if (_threads.find(id) != _threads.end())
193       _threads[id].command_queue.push(RELEASE);
194     pthread_mutex_unlock(&_threads_mutex);
195     // @@@ --------> SECTION CRITIQUE <-------- @@@
196   }
197
198
199   // Methode pour le controle des jobs : modifie un job en file d'attente
200   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
201   {
202   }
203
204   // Methode pour le controle des jobs : modifie un job en file d'attente
205   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
206   {
207     alterJob(jobid, param, Environnement());
208   }
209
210   // Methode pour le controle des jobs : modifie un job en file d'attente
211   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
212   {
213     alterJob(jobid, Parametre(), env);
214   }
215
216
217
218   // Methode pour le controle des jobs : renvoie l'etat du job
219   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
220   {
221     Id id;
222     istringstream iss(jobid.getReference());
223     iss >> id;
224
225     Parametre param;
226     Environnement env;
227
228     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
229     // @@@ --------> SECTION CRITIQUE <-------- @@@
230     pthread_mutex_lock(&_threads_mutex);
231     std::map<Id, Child >::iterator pos = _threads.find(id);
232     bool found = (pos != _threads.end());
233     if (found) {
234       param = pos->second.param;
235       env   = pos->second.env;
236     }
237     pthread_mutex_unlock(&_threads_mutex);
238     // @@@ --------> SECTION CRITIQUE <-------- @@@
239     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
240
241     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
242
243     JobInfo_Local ji(param, env);
244     return ji;
245   }
246
247
248   // Ce manager ne peut pas reprendre un job
249   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
250   // au JobId
251   const Batch::JobId
252   BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
253   {
254     return JobId(this, "undefined");
255   }
256
257   // Methode pour le controle des jobs : teste si un job est present en machine
258   bool BatchManager_Local::isRunning(const JobId & jobid)
259   {
260     Id id;
261     istringstream iss(jobid.getReference());
262     iss >> id;
263
264     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
265     // @@@ --------> SECTION CRITIQUE <-------- @@@
266     pthread_mutex_lock(&_threads_mutex);
267     bool running = (_threads[id].param[STATE].str() == RUNNING);
268     pthread_mutex_unlock(&_threads_mutex);
269     // @@@ --------> SECTION CRITIQUE <-------- @@@
270     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
271
272     return running;
273   }
274
275
276   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
277   {
278     ostringstream exec_sub_cmd;
279
280 #ifdef WIN32
281     char drive[_MAX_DRIVE];
282     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
283     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
284 #endif
285
286     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
287
288     if (param.find(ARGUMENTS) != param.end()) {
289       Versatile V = param[ARGUMENTS];
290       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
291         StringType argt = * static_cast<StringType *>(*it);
292         string     arg  = argt;
293         exec_sub_cmd << " " << arg;
294       }
295     }
296
297     if (param.find(INFILE) != param.end()) {
298       Versatile V = param[INFILE];
299       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
300         Couple cpl = * static_cast<CoupleType*>(*it);
301         string remote = cpl.getRemote();
302         if (remote == "stdin")
303         exec_sub_cmd << " <stdin";
304       }
305     }
306
307     if (param.find(OUTFILE) != param.end()) {
308       Versatile V = param[OUTFILE];
309       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
310         Couple cpl = * static_cast<CoupleType*>(*it);
311         string remote = cpl.getRemote();
312         if (remote == "stdout") exec_sub_cmd << " 1>stdout";
313         if (remote == "stderr") exec_sub_cmd << " 2>stderr";
314       }
315     }
316
317     string user;
318     Parametre::const_iterator it = param.find(USER);
319     if (it != param.end()) {
320       user = string(it->second);
321     }
322
323     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
324   }
325
326
327
328   // Constructeur de la classe ThreadAdapter
329   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
330   _bm(bm), _job(job), _id(id)
331   {
332     // Nothing to do
333   }
334
335
336
337   // Methode d'execution du thread
338   void * BatchManager_Local::ThreadAdapter::run(void * arg)
339   {
340     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
341
342 #ifndef WIN32
343     // On bloque tous les signaux pour ce thread
344     sigset_t setmask;
345     sigfillset(&setmask);
346     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
347 #endif
348
349     // On autorise la terminaison differee du thread
350     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
351     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
352     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
353
354     // On enregistre la fonction de suppression du fils en cas d'arret du thread
355     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
356     // sera prise en compte par pthread_testcancel()
357     Process child;
358     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
359     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
360     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
361
362
363     // Le code retour cumule (ORed) de tous les appels
364     // Nul en cas de reussite de l'ensemble des operations
365     int rc = 0;
366
367     // Cette table contient la liste des fichiers a detruire a la fin du processus
368     std::vector<string> files_to_delete;
369
370
371
372     // On copie les fichiers d'entree pour le fils
373     const Parametre param   = p_ta->_job.getParametre();
374     Parametre::const_iterator it;
375
376     // On initialise la variable workdir a la valeur du Current Working Directory
377     char * cwd =
378 #ifdef WIN32
379       _getcwd(NULL, 0);
380 #else
381       new char [PATH_MAX];
382     getcwd(cwd, PATH_MAX);
383 #endif
384     string workdir = cwd;
385     delete [] cwd;
386
387     if ( (it = param.find(WORKDIR)) != param.end() ) {
388       workdir = static_cast<string>( (*it).second );
389     }
390
391     string executionhost = string(param[EXECUTIONHOST]);
392     string user;
393     if ( (it = param.find(USER)) != param.end() ) {
394       user = string(it->second);
395     }
396
397     if ( (it = param.find(INFILE)) != param.end() ) {
398       Versatile V = (*it).second;
399       Versatile::iterator Vit;
400
401       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
402         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
403         Couple cp       = cpt;
404         string local    = cp.getLocal();
405         string remote   = cp.getRemote();
406
407         std::cerr << workdir << std::endl;
408         std::cerr << remote << std::endl;
409
410         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
411                                                                     workdir + "/" + remote,
412                                                                     executionhost, user);
413         if (status) {
414           // Echec de la copie
415           rc |= 1;
416         } else {
417           // On enregistre le fichier comme etant a detruire
418           files_to_delete.push_back(workdir + "/" + remote);
419         }
420
421       }
422     }
423
424
425
426
427     // On forke/exec un nouveau process pour pouvoir controler le fils
428     // (plus finement qu'avec un appel system)
429     // int rc = system(commande.c_str());
430     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
431     //execv("/usr/bin/ssh", parmList);
432 #ifdef WIN32
433     child = p_ta->launchWin32ChildProcess();
434     p_ta->pere(child);
435 #else
436     child = fork();
437     if (child < 0) { // erreur
438       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
439
440     } else if (child > 0) { // pere
441       p_ta->pere(child);
442
443     } else { // fils
444       p_ta->fils();
445     }
446 #endif
447
448
449     // On copie les fichiers de sortie du fils
450     if ( (it = param.find(OUTFILE)) != param.end() ) {
451       Versatile V = (*it).second;
452       Versatile::iterator Vit;
453
454       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
455         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
456         Couple cp       = cpt;
457         string local    = cp.getLocal();
458         string remote   = cp.getRemote();
459
460         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
461                                                                     executionhost, user,
462                                                                     local, "", "");
463         if (status) {
464           // Echec de la copie
465           rc |= 1;
466         } else {
467           // On enregistre le fichier comme etant a detruire
468           files_to_delete.push_back(workdir + "/" + remote);
469         }
470
471       }
472     }
473
474     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
475     // ou si la creation du fils n'a pu avoir lieu
476     if ( (rc == 0) || (child < 0) ) {
477       std::vector<string>::const_iterator it;
478       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
479         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
480 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
481         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
482 #ifdef WIN32
483         remove_cmd = string("\"") + remove_cmd + string("\"");
484 #endif
485         system(remove_cmd.c_str());*/
486       }
487     }
488
489     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
490
491     // Set the job state to FINISHED or FAILED
492     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
493
494     // On retire la fonction de nettoyage de la memoire
495     pthread_cleanup_pop(0);
496
497     // On retire la fonction de suppression du fils
498     pthread_cleanup_pop(0);
499
500     // remove setFailedOnCancel function from cancel stack
501     pthread_cleanup_pop(0);
502
503     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
504
505     // On invoque la fonction de nettoyage de la memoire
506     delete_on_exit(arg);
507
508     UNDER_LOCK( cout << "Father is leaving" << endl );
509     pthread_exit(NULL);
510     return NULL;
511   }
512
513
514
515
516   void BatchManager_Local::ThreadAdapter::pere(Process child)
517   {
518     time_t child_starttime = time(NULL);
519
520     // On enregistre le fils dans la table des threads
521     pthread_t thread_id = pthread_self();
522
523     Parametre param   = _job.getParametre();
524     Environnement env = _job.getEnvironnement();
525
526     ostringstream id_sst;
527     id_sst << _id;
528     param[ID]         = id_sst.str();
529     param[STATE]      = Batch::RUNNING;
530 #ifndef WIN32
531     param[PID]        = child;
532 #endif
533
534     _bm._threads[_id].thread_id = thread_id;
535 #ifndef WIN32
536     _bm._threads[_id].pid       = child;
537 #endif
538     _bm._threads[_id].hasFailed = false;
539     _bm._threads[_id].param     = param;
540     _bm._threads[_id].env       = env;
541     _bm._threads[_id].command_queue.push(NOP);
542
543     // Unlock the master thread. From here, all shared variables must be protected
544     // from concurrent access
545     pthread_cond_signal(&_bm._threadSyncCondition);
546
547
548     // on boucle en attendant que le fils ait termine
549     while (1) {
550 #ifdef WIN32
551       DWORD exitCode;
552       BOOL res = GetExitCodeProcess(child, &exitCode);
553       if (exitCode != STILL_ACTIVE) {
554         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
555         break;
556       }
557 #else
558       int child_rc = 0;
559       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
560       if (child_wait_rc > 0) {
561          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
562          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
563          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
564          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
565          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
566          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
567          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
568 #ifdef WIFCONTINUED
569          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
570 #endif
571         if (WIFSTOPPED(child_rc)) {
572           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
573           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
574           // desactive car s'il est possible de detecter l'arret d'un process, il est
575           // plus difficile de detecter sa reprise.
576
577           // Le fils est simplement stoppe
578           // @@@ --------> SECTION CRITIQUE <-------- @@@
579           pthread_mutex_lock(&_bm._threads_mutex);
580           _bm._threads[_id].param[STATE] = Batch::PAUSED;
581           pthread_mutex_unlock(&_bm._threads_mutex);
582           // @@@ --------> SECTION CRITIQUE <-------- @@@
583           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
584
585         }
586         else {
587           // Le fils est termine, on sort de la boucle et du if englobant
588           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
589           break;
590         }
591       }
592       else if (child_wait_rc == -1) {
593         // Le fils a disparu ...
594         // @@@ --------> SECTION CRITIQUE <-------- @@@
595         pthread_mutex_lock(&_bm._threads_mutex);
596         _bm._threads[_id].hasFailed = true;
597         pthread_mutex_unlock(&_bm._threads_mutex);
598         // @@@ --------> SECTION CRITIQUE <-------- @@@
599         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
600         break;
601       }
602 #endif
603
604       // On teste si le thread doit etre detruit
605       pthread_testcancel();
606
607
608
609       // On regarde si le fils n'a pas depasse son temps (wallclock time)
610       time_t child_currenttime = time(NULL);
611       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
612       if (param.find(MAXWALLTIME) != param.end()) {
613         long maxwalltime = param[MAXWALLTIME];
614         //        cout << "child_starttime          = " << child_starttime        << endl
615         //             << "child_currenttime        = " << child_currenttime      << endl
616         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
617         //             << "maxwalltime              = " << maxwalltime            << endl
618         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
619         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
620           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
621           // On introduit une commande dans la queue du thread
622           // @@@ --------> SECTION CRITIQUE <-------- @@@
623           pthread_mutex_lock(&_bm._threads_mutex);
624           if (_bm._threads.find(_id) != _bm._threads.end())
625             _bm._threads[_id].command_queue.push(KILL);
626           pthread_mutex_unlock(&_bm._threads_mutex);
627           // @@@ --------> SECTION CRITIQUE <-------- @@@
628
629
630         } else if (child_elapsedtime_minutes > maxwalltime ) {
631           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
632           // On introduit une commande dans la queue du thread
633           // @@@ --------> SECTION CRITIQUE <-------- @@@
634           pthread_mutex_lock(&_bm._threads_mutex);
635           if (_bm._threads.find(_id) != _bm._threads.end())
636             _bm._threads[_id].command_queue.push(TERM);
637           pthread_mutex_unlock(&_bm._threads_mutex);
638           // @@@ --------> SECTION CRITIQUE <-------- @@@
639         }
640       }
641
642
643
644       // On regarde s'il y a quelque chose a faire dans la queue de commande
645       // @@@ --------> SECTION CRITIQUE <-------- @@@
646       pthread_mutex_lock(&_bm._threads_mutex);
647       if (_bm._threads.find(_id) != _bm._threads.end()) {
648         while (_bm._threads[_id].command_queue.size() > 0) {
649           Commande cmd = _bm._threads[_id].command_queue.front();
650           _bm._threads[_id].command_queue.pop();
651
652           switch (cmd) {
653     case NOP:
654       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
655       break;
656 #ifndef WIN32
657     case HOLD:
658       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
659       kill(child, SIGSTOP);
660       break;
661
662     case RELEASE:
663       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
664       kill(child, SIGCONT);
665       break;
666
667     case TERM:
668       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
669       kill(child, SIGTERM);
670       break;
671
672     case KILL:
673       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
674       kill(child, SIGKILL);
675       break;
676 #endif
677     case ALTER:
678       break;
679
680     default:
681       break;
682           }
683         }
684
685       }
686       pthread_mutex_unlock(&_bm._threads_mutex);
687       // @@@ --------> SECTION CRITIQUE <-------- @@@
688
689       // On fait une petite pause pour ne pas surcharger inutilement le processeur
690 #ifdef WIN32
691       Sleep(1000);
692 #else
693       sleep(1);
694 #endif
695
696     }
697
698   }
699
700
701
702 #ifndef WIN32
703
704   void BatchManager_Local::ThreadAdapter::fils()
705   {
706     Parametre param = _job.getParametre();
707     Parametre::iterator it;
708
709       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
710       //int result = execv("/usr/bin/ssh", parmList);
711       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
712       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
713     try {
714
715       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
716       vector<string> command;
717       if (param.find(EXECUTABLE) != param.end()) {
718         command = _bm.exec_command(param);
719       } else exit(1);
720
721       // Build the argument array argv from the command
722       char ** argv = new char * [command.size() + 1];
723       string comstr;
724       for (string::size_type i=0 ; i<command.size() ; i++) {
725         argv[i] = new char[command[i].size() + 1];
726         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
727         if (i>0) comstr += " # ";
728         comstr += command[i];
729       }
730
731       argv[command.size()] = NULL;
732
733       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
734       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
735
736       // Create the environment for the new process. Note (RB): Here we change the environment for
737       // the process launched in local. It would seem more logical to set the environment for the
738       // remote process.
739       Environnement env = _job.getEnvironnement();
740
741       char ** envp = NULL;
742       if(env.size() > 0) {
743         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
744         int i = 0;
745         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
746           const string  & key   = (*it).first;
747           const string  & value = (*it).second;
748           ostringstream oss;
749           oss << key << "=" << value;
750           envp[i]         = new char [oss.str().size() + 1];
751           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
752         }
753
754         // assert (i == env.size())
755         envp[i] = NULL;
756       }
757
758       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
759       //int result = execv("/usr/bin/ssh", parmList);
760       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
761       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
762
763
764
765       // On positionne les limites systeme imposees au fils
766       if (param.find(MAXCPUTIME) != param.end()) {
767         int maxcputime = param[MAXCPUTIME];
768         struct rlimit limit;
769         limit.rlim_cur = maxcputime;
770         limit.rlim_max = int(maxcputime * 1.1);
771         setrlimit(RLIMIT_CPU, &limit);
772       }
773
774       if (param.find(MAXDISKSIZE) != param.end()) {
775         int maxdisksize = param[MAXDISKSIZE];
776         struct rlimit limit;
777         limit.rlim_cur = maxdisksize * 1024;
778         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
779         setrlimit(RLIMIT_FSIZE, &limit);
780       }
781
782       if (param.find(MAXRAMSIZE) != param.end()) {
783         int maxramsize = param[MAXRAMSIZE];
784         struct rlimit limit;
785         limit.rlim_cur = maxramsize * 1024 * 1024;
786         limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
787         setrlimit(RLIMIT_AS, &limit);
788       }
789
790
791       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
792       //int result = execv("/usr/bin/ssh", parmList);
793       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
794       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
795
796       // On cree une session pour le fils de facon a ce qu'il ne soit pas
797       // detruit lorsque le shell se termine (le shell ouvre une session et
798       // tue tous les process appartenant a la session en quittant)
799       setsid();
800
801
802       // On ferme les descripteurs de fichiers standards
803       //close(STDIN_FILENO);
804       //close(STDOUT_FILENO);
805       //close(STDERR_FILENO);
806
807
808       // On execute la commande du fils
809       execve(argv[0], argv, envp);
810       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
811       // No need to deallocate since nothing happens after a successful exec
812
813       // Normalement on ne devrait jamais arriver ici
814       ofstream file_err("error.log");
815       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
816
817     } catch (GenericException & e) {
818
819       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
820     }
821
822     exit(99);
823   }
824
825 #else
826
827   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
828   {
829     Parametre param = _job.getParametre();
830     Parametre::iterator it;
831     PROCESS_INFORMATION pi;
832
833     try {
834
835       // EXECUTABLE is MANDATORY, if missing, we throw an exception
836       vector<string> exec_command;
837       if (param.find(EXECUTABLE) != param.end()) {
838         exec_command = _bm.exec_command(param);
839       } else {
840         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
841       }
842
843       // Build the command string from the command argument vector
844       string comstr;
845       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
846         if (i>0) comstr += " ";
847         comstr += exec_command[i];
848       }
849
850       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
851
852       // Create the environment for the new process. Note (RB): Here we change the environment for
853       // the process launched in local. It would seem more logical to set the environment for the
854       // remote process.
855       // Note that if no environment is specified, we reuse the current environment.
856       Environnement env = _job.getEnvironnement();
857       char * chNewEnv = NULL;
858
859       if(env.size() > 0) {
860         chNewEnv = new char[4096];
861         LPTSTR lpszCurrentVariable = chNewEnv;
862         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
863           const string  & key   = (*it).first;
864           const string  & value = (*it).second;
865           string envvar = key + "=" + value;
866           envvar.copy(lpszCurrentVariable, envvar.size());
867           lpszCurrentVariable[envvar.size()] = '\0';
868           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
869         }
870         // Terminate the block with a NULL byte.
871         *lpszCurrentVariable = '\0';
872       }
873
874
875       STARTUPINFO si;
876       ZeroMemory( &si, sizeof(si) );
877       si.cb = sizeof(si);
878       ZeroMemory( &pi, sizeof(pi) );
879
880       // Copy the command to a non-const buffer
881       char * buffer = strdup(comstr.c_str());
882
883       // launch the new process
884       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
885                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
886
887       if (buffer) free(buffer);
888       if (!res) throw RunTimeException("Error while creating new process");
889
890       CloseHandle(pi.hThread);
891
892     } catch (GenericException & e) {
893
894       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
895     }
896
897     return pi.hProcess;
898   }
899
900 #endif
901
902
903   void BatchManager_Local::kill_child_on_exit(void * p_pid)
904   {
905 #ifndef WIN32
906     //TODO: porting of following functionality
907     pid_t child = * static_cast<pid_t *>(p_pid);
908
909     // On tue le fils
910     kill(child, SIGTERM);
911
912     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
913     // mais cette option n'est pas implementee pour le moment, car il est
914     // preferable de laisser le process fils se terminer normalement et seul.
915 #endif
916   }
917
918   void BatchManager_Local::delete_on_exit(void * arg)
919   {
920     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
921     delete p_ta;
922   }
923
924   void BatchManager_Local::setFailedOnCancel(void * arg)
925   {
926     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
927     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
928     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
929     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
930
931     // Unlock the master thread. From here, the batch manager instance should not be used.
932     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
933   }
934
935 }