Salome HOME
A little fix for compilation on old gcc (i.e. 3.3.5)
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #include "Batch_RunTimeException.hxx"
45 #else
46 #include <sys/wait.h>
47 #include <unistd.h>
48 #endif
49 #include <ctime>
50 #include <pthread.h>
51 #include <signal.h>
52 #include <errno.h>
53 #include <string.h>
54 #include "Batch_IOMutex.hxx"
55 #include "Batch_BatchManager_Local.hxx"
56
57 using namespace std;
58
59 namespace Batch {
60
61
62   // Constructeur
63   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64                                          CommunicationProtocolType protocolType)
65     : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(),
66       _protocol(CommunicationProtocol::getInstance(protocolType)),
67       _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
68   {
69     pthread_mutex_init(&_threads_mutex, NULL);
70     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
71     pthread_cond_init(&_thread_id_id_association_cond, NULL);
72   }
73
74   // Destructeur
75   BatchManager_Local::~BatchManager_Local()
76   {
77     pthread_mutex_destroy(&_threads_mutex);
78     pthread_mutex_destroy(&_thread_id_id_association_mutex);
79     pthread_cond_destroy(&_thread_id_id_association_cond);
80   }
81
82   const CommunicationProtocol & BatchManager_Local::getProtocol() const
83   {
84     return _protocol;
85   }
86
87   // Methode pour le controle des jobs : soumet un job au gestionnaire
88   const JobId BatchManager_Local::submitJob(const Job & job)
89   {
90     Job_Local jobLocal = job;
91
92     pthread_t thread_id = submit(jobLocal);
93
94     ostringstream oss;
95     oss << getIdByThread_id(thread_id);
96
97     JobId id(this, oss.str());
98
99     return id;
100   }
101
102   // Methode pour le controle des jobs : retire un job du gestionnaire
103   void BatchManager_Local::deleteJob(const JobId & jobid)
104   {
105     Id id;
106
107     istringstream iss(jobid.getReference());
108     iss >> id;
109
110     // On retrouve le thread_id du thread
111     pthread_t thread_id;
112
113     // @@@ --------> SECTION CRITIQUE <-------- @@@
114     pthread_mutex_lock(&_threads_mutex);
115     if (_threads.find(id) != _threads.end())
116       thread_id = _threads[id].thread_id;
117     pthread_mutex_unlock(&_threads_mutex);
118     // @@@ --------> SECTION CRITIQUE <-------- @@@
119
120     cancel(thread_id);
121   }
122
123   // Methode pour le controle des jobs : suspend un job en file d'attente
124   void BatchManager_Local::holdJob(const JobId & jobid)
125   {
126     Id id;
127     istringstream iss(jobid.getReference());
128     iss >> id;
129
130     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
131
132     // On introduit une commande dans la queue du thread
133     // @@@ --------> SECTION CRITIQUE <-------- @@@
134     pthread_mutex_lock(&_threads_mutex);
135     if (_threads.find(id) != _threads.end())
136       _threads[id].command_queue.push(HOLD);
137     pthread_mutex_unlock(&_threads_mutex);
138     // @@@ --------> SECTION CRITIQUE <-------- @@@
139   }
140
141   // Methode pour le controle des jobs : relache un job suspendu
142   void BatchManager_Local::releaseJob(const JobId & jobid)
143   {
144     Id id;
145     istringstream iss(jobid.getReference());
146     iss >> id;
147
148     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
149
150     // On introduit une commande dans la queue du thread
151     // @@@ --------> SECTION CRITIQUE <-------- @@@
152     pthread_mutex_lock(&_threads_mutex);
153     if (_threads.find(id) != _threads.end())
154       _threads[id].command_queue.push(RELEASE);
155     pthread_mutex_unlock(&_threads_mutex);
156     // @@@ --------> SECTION CRITIQUE <-------- @@@
157   }
158
159
160   // Methode pour le controle des jobs : modifie un job en file d'attente
161   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
162   {
163   }
164
165   // Methode pour le controle des jobs : modifie un job en file d'attente
166   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
167   {
168     alterJob(jobid, param, Environnement());
169   }
170
171   // Methode pour le controle des jobs : modifie un job en file d'attente
172   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
173   {
174     alterJob(jobid, Parametre(), env);
175   }
176
177
178
179   // Methode pour le controle des jobs : renvoie l'etat du job
180   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
181   {
182     Id id;
183     istringstream iss(jobid.getReference());
184     iss >> id;
185
186     Parametre param;
187     Environnement env;
188
189     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
190     // @@@ --------> SECTION CRITIQUE <-------- @@@
191     pthread_mutex_lock(&_threads_mutex);
192     param = _threads[id].param;
193     env   = _threads[id].env;
194     pthread_mutex_unlock(&_threads_mutex);
195     // @@@ --------> SECTION CRITIQUE <-------- @@@
196     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
197
198     JobInfo_Local ji(param, env);
199     return ji;
200   }
201
202
203
204   // Methode pour le controle des jobs : teste si un job est present en machine
205   bool BatchManager_Local::isRunning(const JobId & jobid)
206   {
207     Id id;
208     istringstream iss(jobid.getReference());
209     iss >> id;
210
211     Status status;
212
213     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
214     // @@@ --------> SECTION CRITIQUE <-------- @@@
215     pthread_mutex_lock(&_threads_mutex);
216     status = _threads[id].status;
217     pthread_mutex_unlock(&_threads_mutex);
218     // @@@ --------> SECTION CRITIQUE <-------- @@@
219     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
220
221     return (status == RUNNING);
222   }
223
224
225   // Methode d'execution d'un job
226   pthread_t BatchManager_Local::submit(const Job_Local & job)
227   {
228     // L'id du thread a creer
229     pthread_t thread_id =
230 #ifdef WIN32
231     {0,0};
232 #else
233       0;
234 #endif
235
236     // Les attributs du thread a sa creation
237     pthread_attr_t thread_attr;
238     pthread_attr_init(&thread_attr);
239     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
240
241     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
242
243     // Creation du thread qui va executer la commande systeme qu'on lui passe
244     int rc = pthread_create(&thread_id,
245       &thread_attr,
246       &ThreadAdapter::run,
247       static_cast<void *>(p_ta));
248     if (rc) {
249     }
250
251     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
252     pthread_attr_destroy(&thread_attr);
253
254     return thread_id;
255   }
256
257
258   // Methode de destruction d'un job
259   void BatchManager_Local::cancel(pthread_t thread_id)
260   {
261     pthread_cancel(thread_id);
262   }
263
264
265   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
266   {
267     ostringstream exec_sub_cmd;
268
269 #ifdef WIN32
270     char drive[_MAX_DRIVE];
271     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
272     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
273 #endif
274
275     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
276
277     if (param.find(ARGUMENTS) != param.end()) {
278       Versatile V = param[ARGUMENTS];
279       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
280         StringType argt = * static_cast<StringType *>(*it);
281         string     arg  = argt;
282         exec_sub_cmd << " " << arg;
283       }
284     }
285
286     string user;
287     Parametre::const_iterator it = param.find(USER);
288     if (it != param.end()) {
289       user = string(it->second);
290     }
291
292     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
293   }
294
295
296   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
297   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
298   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
299   // Thread_id / Id (_thread_id_id_association_mutex)
300   BatchManager_Local::Id BatchManager_Local::nextId()
301   {
302     static Id id = 0;
303     Id nextId = id++;
304     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
305     return nextId;
306   }
307
308
309   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
310   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
311   {
312     // @@@ --------> SECTION CRITIQUE <-------- @@@
313     pthread_mutex_lock(&_thread_id_id_association_mutex);
314     bool threadIdFound = false;
315     std::list<struct ThreadIdIdAssociation>::iterator it;
316     while (!threadIdFound) {
317       for (it = _thread_id_id_association.begin() ;
318            it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
319            it++);
320       if (it == _thread_id_id_association.end())
321         pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
322       else
323         threadIdFound = true;
324     }
325
326     Id id = it->id;
327     _thread_id_id_association.erase(it);
328
329     pthread_mutex_unlock(&_thread_id_id_association_mutex);
330     // @@@ --------> SECTION CRITIQUE <-------- @@@
331
332     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
333     return id;
334   }
335
336
337   // Associe un Thread_id a un Id nouvellement cree
338   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
339   {
340     Id id = -1;
341
342     // @@@ --------> SECTION CRITIQUE <-------- @@@
343     pthread_mutex_lock(&_thread_id_id_association_mutex);
344     std::list<struct ThreadIdIdAssociation>::iterator it;
345     for (it = _thread_id_id_association.begin() ;
346          it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
347          it++);
348     if (it == _thread_id_id_association.end()) {
349       struct ThreadIdIdAssociation newAssociation;
350       id = newAssociation.id = nextId();
351       newAssociation.threadId = thread_id;
352       _thread_id_id_association.push_back(newAssociation);
353       pthread_cond_signal(&_thread_id_id_association_cond);
354
355     } else {
356       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
357     }
358     pthread_mutex_unlock(&_thread_id_id_association_mutex);
359     // @@@ --------> SECTION CRITIQUE <-------- @@@
360
361     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
362     return id;
363   }
364
365
366   // Constructeur de la classe ThreadAdapter
367   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
368   _bm(bm), _job(job)
369   {
370     // Nothing to do
371   }
372
373
374
375   // Methode d'execution du thread
376   void * BatchManager_Local::ThreadAdapter::run(void * arg)
377   {
378 #ifndef WIN32
379     // On bloque tous les signaux pour ce thread
380     sigset_t setmask;
381     sigfillset(&setmask);
382     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
383 #endif
384
385     // On autorise la terminaison differee du thread
386     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
387     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
388     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
389
390     // On enregistre la fonction de suppression du fils en cas d'arret du thread
391     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
392     // sera prise en compte par pthread_testcancel()
393     Process child;
394     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
395     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
396
397     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
398
399
400
401
402     // Le code retour cumule (ORed) de tous les appels
403     // Nul en cas de reussite de l'ensemble des operations
404     int rc = 0;
405
406     // Cette table contient la liste des fichiers a detruire a la fin du processus
407     std::vector<string> files_to_delete;
408
409
410
411     // On copie les fichiers d'entree pour le fils
412     const Parametre param   = p_ta->_job.getParametre();
413     Parametre::const_iterator it;
414
415     // On initialise la variable workdir a la valeur du Current Working Directory
416     char * cwd =
417 #ifdef WIN32
418       _getcwd(NULL, 0);
419 #else
420       new char [PATH_MAX];
421     getcwd(cwd, PATH_MAX);
422 #endif
423     string workdir = cwd;
424     delete [] cwd;
425
426     if ( (it = param.find(WORKDIR)) != param.end() ) {
427       workdir = static_cast<string>( (*it).second );
428     }
429
430     string executionhost = string(param[EXECUTIONHOST]);
431     string user;
432     if ( (it = param.find(USER)) != param.end() ) {
433       user = string(it->second);
434     }
435
436     if ( (it = param.find(INFILE)) != param.end() ) {
437       Versatile V = (*it).second;
438       Versatile::iterator Vit;
439
440       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
441         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
442         Couple cp       = cpt;
443         string local    = cp.getLocal();
444         string remote   = cp.getRemote();
445
446         std::cerr << workdir << std::endl;
447         std::cerr << remote << std::endl;
448
449         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
450                                                                     workdir + "/" + remote,
451                                                                     executionhost, user);
452         if (status) {
453           // Echec de la copie
454           rc |= 1;
455         } else {
456           // On enregistre le fichier comme etant a detruire
457           files_to_delete.push_back(workdir + "/" + remote);
458         }
459
460       }
461     }
462
463
464
465
466     // On forke/exec un nouveau process pour pouvoir controler le fils
467     // (plus finement qu'avec un appel system)
468     // int rc = system(commande.c_str());
469     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
470     //execv("/usr/bin/ssh", parmList);
471 #ifdef WIN32
472     child = p_ta->launchWin32ChildProcess();
473     p_ta->pere(child);
474 #else
475     child = fork();
476     if (child < 0) { // erreur
477       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
478
479     } else if (child > 0) { // pere
480       p_ta->pere(child);
481
482     } else { // fils
483       p_ta->fils();
484     }
485 #endif
486
487
488     // On copie les fichiers de sortie du fils
489     if ( (it = param.find(OUTFILE)) != param.end() ) {
490       Versatile V = (*it).second;
491       Versatile::iterator Vit;
492
493       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
494         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
495         Couple cp       = cpt;
496         string local    = cp.getLocal();
497         string remote   = cp.getRemote();
498
499         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
500                                                                     executionhost, user,
501                                                                     local, "", "");
502         if (status) {
503           // Echec de la copie
504           rc |= 1;
505         } else {
506           // On enregistre le fichier comme etant a detruire
507           files_to_delete.push_back(workdir + "/" + remote);
508         }
509
510       }
511     }
512
513     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
514     // ou si la creation du fils n'a pu avoir lieu
515     if ( (rc == 0) || (child < 0) ) {
516       std::vector<string>::const_iterator it;
517       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
518         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
519 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
520         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
521 #ifdef WIN32
522         remove_cmd = string("\"") + remove_cmd + string("\"");
523 #endif
524         system(remove_cmd.c_str());*/
525       }
526     }
527
528
529
530     // On retire la fonction de nettoyage de la memoire
531     pthread_cleanup_pop(0);
532
533     // On retire la fonction de suppression du fils
534     pthread_cleanup_pop(0);
535
536
537
538     // On invoque la fonction de nettoyage de la memoire
539     delete_on_exit(arg);
540
541     UNDER_LOCK( cout << "Father is leaving" << endl );
542     pthread_exit(NULL);
543     return NULL;
544   }
545
546
547
548
549   void BatchManager_Local::ThreadAdapter::pere(Process child)
550   {
551     time_t child_starttime = time(NULL);
552
553     // On enregistre le fils dans la table des threads
554     pthread_t thread_id = pthread_self();
555     Id id = _bm.registerThread_id(thread_id);
556
557     Parametre param   = _job.getParametre();
558     Environnement env = _job.getEnvironnement();
559
560     ostringstream thread_id_sst;
561     thread_id_sst << id;
562     param[ID]         = thread_id_sst.str();
563     param[STATE]      = "Running";
564 #ifndef WIN32
565     param[PID]        = child;
566 #endif
567
568     // @@@ --------> SECTION CRITIQUE <-------- @@@
569     pthread_mutex_lock(&_bm._threads_mutex);
570     _bm._threads[id].thread_id = thread_id;
571 #ifndef WIN32
572     _bm._threads[id].pid       = child;
573 #endif
574     _bm._threads[id].status    = RUNNING;
575     _bm._threads[id].param     = param;
576     _bm._threads[id].env       = env;
577     _bm._threads[id].command_queue.push(NOP);
578     pthread_mutex_unlock(&_bm._threads_mutex);
579     // @@@ --------> SECTION CRITIQUE <-------- @@@
580
581
582
583
584     // on boucle en attendant que le fils ait termine
585     while (1) {
586 #ifdef WIN32
587       DWORD exitCode;
588       BOOL res = GetExitCodeProcess(child, &exitCode);
589       if (exitCode != STILL_ACTIVE) {
590         pthread_mutex_lock(&_bm._threads_mutex);
591         _bm._threads[id].status       = DONE;
592         _bm._threads[id].param[STATE] = "Done";
593         pthread_mutex_unlock(&_bm._threads_mutex);
594         // @@@ --------> SECTION CRITIQUE <-------- @@@
595         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
596         break;
597       }
598 #else
599       int child_rc = 0;
600       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
601       if (child_wait_rc > 0) {
602          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
603          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
604          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
605          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
606          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
607          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
608          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
609 #ifdef WIFCONTINUED
610          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
611 #endif
612         if (WIFSTOPPED(child_rc)) {
613           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
614           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
615           // desactive car s'il est possible de detecter l'arret d'un process, il est
616           // plus difficile de detecter sa reprise.
617
618           // Le fils est simplement stoppe
619           // @@@ --------> SECTION CRITIQUE <-------- @@@
620           pthread_mutex_lock(&_bm._threads_mutex);
621           _bm._threads[id].status       = STOPPED;
622           _bm._threads[id].param[STATE] = "Stopped";
623           pthread_mutex_unlock(&_bm._threads_mutex);
624           // @@@ --------> SECTION CRITIQUE <-------- @@@
625           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
626
627         }
628         else {
629           // Le fils est termine, on sort de la boucle et du if englobant
630           // @@@ --------> SECTION CRITIQUE <-------- @@@
631           pthread_mutex_lock(&_bm._threads_mutex);
632           _bm._threads[id].status       = DONE;
633           _bm._threads[id].param[STATE] = "Done";
634           pthread_mutex_unlock(&_bm._threads_mutex);
635           // @@@ --------> SECTION CRITIQUE <-------- @@@
636           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
637           break;
638         }
639       }
640       else if (child_wait_rc == -1) {
641         // Le fils a disparu ...
642         // @@@ --------> SECTION CRITIQUE <-------- @@@
643         pthread_mutex_lock(&_bm._threads_mutex);
644         _bm._threads[id].status       = DEAD;
645         _bm._threads[id].param[STATE] = "Dead";
646         pthread_mutex_unlock(&_bm._threads_mutex);
647         // @@@ --------> SECTION CRITIQUE <-------- @@@
648         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
649         break;
650       }
651 #endif
652
653       // On teste si le thread doit etre detruit
654       pthread_testcancel();
655
656
657
658       // On regarde si le fils n'a pas depasse son temps (wallclock time)
659       time_t child_currenttime = time(NULL);
660       time_t child_elapsedtime = child_currenttime - child_starttime;
661       if (param.find(MAXWALLTIME) != param.end()) {
662         int maxwalltime = param[MAXWALLTIME];
663         //        cout << "child_starttime          = " << child_starttime        << endl
664         //             << "child_currenttime        = " << child_currenttime      << endl
665         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
666         //             << "maxwalltime              = " << maxwalltime            << endl
667         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
668         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
669           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
670           // On introduit une commande dans la queue du thread
671           // @@@ --------> SECTION CRITIQUE <-------- @@@
672           pthread_mutex_lock(&_bm._threads_mutex);
673           if (_bm._threads.find(id) != _bm._threads.end())
674             _bm._threads[id].command_queue.push(KILL);
675           pthread_mutex_unlock(&_bm._threads_mutex);
676           // @@@ --------> SECTION CRITIQUE <-------- @@@
677
678
679         } else if (child_elapsedtime > maxwalltime ) {
680           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
681           // On introduit une commande dans la queue du thread
682           // @@@ --------> SECTION CRITIQUE <-------- @@@
683           pthread_mutex_lock(&_bm._threads_mutex);
684           if (_bm._threads.find(id) != _bm._threads.end())
685             _bm._threads[id].command_queue.push(TERM);
686           pthread_mutex_unlock(&_bm._threads_mutex);
687           // @@@ --------> SECTION CRITIQUE <-------- @@@
688         }
689       }
690
691
692
693       // On regarde s'il y a quelque chose a faire dans la queue de commande
694       // @@@ --------> SECTION CRITIQUE <-------- @@@
695       pthread_mutex_lock(&_bm._threads_mutex);
696       if (_bm._threads.find(id) != _bm._threads.end()) {
697         while (_bm._threads[id].command_queue.size() > 0) {
698           Commande cmd = _bm._threads[id].command_queue.front();
699           _bm._threads[id].command_queue.pop();
700
701           switch (cmd) {
702     case NOP:
703       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
704       break;
705 #ifndef WIN32
706     case HOLD:
707       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
708       kill(child, SIGSTOP);
709       break;
710
711     case RELEASE:
712       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
713       kill(child, SIGCONT);
714       break;
715
716     case TERM:
717       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
718       kill(child, SIGTERM);
719       break;
720
721     case KILL:
722       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
723       kill(child, SIGKILL);
724       break;
725 #endif
726     case ALTER:
727       break;
728
729     default:
730       break;
731           }
732         }
733
734       }
735       pthread_mutex_unlock(&_bm._threads_mutex);
736       // @@@ --------> SECTION CRITIQUE <-------- @@@
737
738       // On fait une petite pause pour ne pas surcharger inutilement le processeur
739 #ifdef WIN32
740       Sleep(1000);
741 #else
742       sleep(1);
743 #endif
744
745     }
746
747   }
748
749
750
751 #ifndef WIN32
752
753   void BatchManager_Local::ThreadAdapter::fils()
754   {
755     Parametre param = _job.getParametre();
756     Parametre::iterator it;
757
758       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
759       //int result = execv("/usr/bin/ssh", parmList);
760       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
761       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
762     try {
763
764       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
765       vector<string> command;
766       if (param.find(EXECUTABLE) != param.end()) {
767         command = _bm.exec_command(param);
768       } else exit(1);
769
770       // Build the argument array argv from the command
771       char ** argv = new char * [command.size() + 1];
772       string comstr;
773       for (string::size_type i=0 ; i<command.size() ; i++) {
774         argv[i] = new char[command[i].size() + 1];
775         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
776         if (i>0) comstr += " # ";
777         comstr += command[i];
778       }
779
780       argv[command.size()] = NULL;
781
782       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
783       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
784
785       // Create the environment for the new process. Note (RB): Here we change the environment for
786       // the process launched in local. It would seem more logical to set the environment for the
787       // remote process.
788       Environnement env = _job.getEnvironnement();
789
790       char ** envp = NULL;
791       if(env.size() > 0) {
792         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
793         int i = 0;
794         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
795           const string  & key   = (*it).first;
796           const string  & value = (*it).second;
797           ostringstream oss;
798           oss << key << "=" << value;
799           envp[i]         = new char [oss.str().size() + 1];
800           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
801         }
802
803         // assert (i == env.size())
804         envp[i] = NULL;
805       }
806
807       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
808       //int result = execv("/usr/bin/ssh", parmList);
809       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
810       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
811
812
813
814       // On positionne les limites systeme imposees au fils
815       if (param.find(MAXCPUTIME) != param.end()) {
816         int maxcputime = param[MAXCPUTIME];
817         struct rlimit limit;
818         limit.rlim_cur = maxcputime;
819         limit.rlim_max = int(maxcputime * 1.1);
820         setrlimit(RLIMIT_CPU, &limit);
821       }
822
823       if (param.find(MAXDISKSIZE) != param.end()) {
824         int maxdisksize = param[MAXDISKSIZE];
825         struct rlimit limit;
826         limit.rlim_cur = maxdisksize * 1024;
827         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
828         setrlimit(RLIMIT_FSIZE, &limit);
829       }
830
831       if (param.find(MAXRAMSIZE) != param.end()) {
832         int maxramsize = param[MAXRAMSIZE];
833         struct rlimit limit;
834         limit.rlim_cur = maxramsize * 1024;
835         limit.rlim_max = int(maxramsize * 1.1) * 1024;
836         setrlimit(RLIMIT_AS, &limit);
837       }
838
839
840       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
841       //int result = execv("/usr/bin/ssh", parmList);
842       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
843       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
844
845       // On cree une session pour le fils de facon a ce qu'il ne soit pas
846       // detruit lorsque le shell se termine (le shell ouvre une session et
847       // tue tous les process appartenant a la session en quittant)
848       setsid();
849
850
851       // On ferme les descripteurs de fichiers standards
852       //close(STDIN_FILENO);
853       //close(STDOUT_FILENO);
854       //close(STDERR_FILENO);
855
856
857       // On execute la commande du fils
858       int result = execve(argv[0], argv, envp);
859       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
860       // No need to deallocate since nothing happens after a successful exec
861
862       // Normalement on ne devrait jamais arriver ici
863       ofstream file_err("error.log");
864       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
865
866     } catch (GenericException & e) {
867
868       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
869     }
870
871     exit(99);
872   }
873
874 #else
875
876   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
877   {
878     Parametre param = _job.getParametre();
879     Parametre::iterator it;
880     PROCESS_INFORMATION pi;
881
882     try {
883
884       // EXECUTABLE is MANDATORY, if missing, we throw an exception
885       vector<string> exec_command;
886       if (param.find(EXECUTABLE) != param.end()) {
887         exec_command = _bm.exec_command(param);
888       } else {
889         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
890       }
891
892       // Build the command string from the command argument vector
893       string comstr;
894       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
895         if (i>0) comstr += " ";
896         comstr += exec_command[i];
897       }
898
899       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
900
901       // Create the environment for the new process. Note (RB): Here we change the environment for
902       // the process launched in local. It would seem more logical to set the environment for the
903       // remote process.
904       // Note that if no environment is specified, we reuse the current environment.
905       Environnement env = _job.getEnvironnement();
906       char * chNewEnv = NULL;
907
908       if(env.size() > 0) {
909         chNewEnv = new char[4096];
910         LPTSTR lpszCurrentVariable = chNewEnv;
911         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
912           const string  & key   = (*it).first;
913           const string  & value = (*it).second;
914           string envvar = key + "=" + value;
915           envvar.copy(lpszCurrentVariable, envvar.size());
916           lpszCurrentVariable[envvar.size()] = '\0';
917           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
918         }
919         // Terminate the block with a NULL byte.
920         *lpszCurrentVariable = '\0';
921       }
922
923
924       STARTUPINFO si;
925       ZeroMemory( &si, sizeof(si) );
926       si.cb = sizeof(si);
927       ZeroMemory( &pi, sizeof(pi) );
928
929       // Copy the command to a non-const buffer
930       char * buffer = strdup(comstr.c_str());
931
932       // launch the new process
933       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
934                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
935
936       if (buffer) free(buffer);
937       if (!res) throw RunTimeException("Error while creating new process");
938
939       CloseHandle(pi.hThread);
940
941     } catch (GenericException & e) {
942
943       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
944     }
945
946     return pi.hProcess;
947   }
948
949 #endif
950
951
952   void BatchManager_Local::kill_child_on_exit(void * p_pid)
953   {
954 #ifndef WIN32
955     //TODO: porting of following functionality
956     pid_t child = * static_cast<pid_t *>(p_pid);
957
958     // On tue le fils
959     kill(child, SIGTERM);
960
961     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
962     // mais cette option n'est pas implementee pour le moment, car il est
963     // preferable de laisser le process fils se terminer normalement et seul.
964 #endif
965   }
966
967   void BatchManager_Local::delete_on_exit(void * arg)
968   {
969     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
970     delete p_ta;
971   }
972
973 }