]> SALOME platform Git repositories - modules/kernel.git/blob - src/Batch/Batch_BatchManager_Local.cxx
Salome HOME
merge from branch BR_V5_DEV
[modules/kernel.git] / src / Batch / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx : 
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 */
31
32 #include <iostream>
33 #include <fstream>
34 #include <sstream>
35 #include <cstdlib>
36 #include <limits.h>
37
38 #include <sys/types.h>
39 #ifdef WIN32
40 # include <direct.h>
41 #else
42 # include <sys/wait.h>
43 # include <unistd.h>
44 #endif
45 #include <ctime>
46 #include <pthread.h>
47 #include <signal.h>
48 #include <errno.h>
49 #include <string.h>
50 #include "Batch_IOMutex.hxx"
51 #include "Batch_BatchManager_Local.hxx"
52
53 using namespace std;
54
55 namespace Batch {
56
57
58   // Constructeur
59   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host) throw(InvalidArgumentException,ConnexionFailureException) : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), _thread_id_id_association_mutex(), _thread_id_id_association_cond()
60 #ifndef WIN32 //TODO: porting of following functionality
61     ,_thread_id_id_association()
62 #endif
63   {
64     pthread_mutex_init(&_threads_mutex, NULL);
65     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
66     pthread_cond_init(&_thread_id_id_association_cond, NULL);
67   }
68
69   // Destructeur
70   BatchManager_Local::~BatchManager_Local()
71   {
72     pthread_mutex_destroy(&_threads_mutex);
73     pthread_mutex_destroy(&_thread_id_id_association_mutex);
74     pthread_cond_destroy(&_thread_id_id_association_cond);
75   }
76
77   // Methode pour le controle des jobs : soumet un job au gestionnaire
78   const JobId BatchManager_Local::submitJob(const Job & job)
79   {
80     Job_Local jobLocal = job;
81
82     pthread_t thread_id = submit(jobLocal);
83
84     ostringstream oss;
85     oss << getIdByThread_id(thread_id);
86
87     JobId id(this, oss.str());
88
89     return id;
90   }
91
92   // Methode pour le controle des jobs : retire un job du gestionnaire
93   void BatchManager_Local::deleteJob(const JobId & jobid)
94   {
95     Id id;
96
97     istringstream iss(jobid.getReference());
98     iss >> id;
99
100     // On retrouve le thread_id du thread
101     pthread_t thread_id;
102
103     // @@@ --------> SECTION CRITIQUE <-------- @@@
104     pthread_mutex_lock(&_threads_mutex);
105     if (_threads.find(id) != _threads.end()) 
106       thread_id = _threads[id].thread_id;
107     pthread_mutex_unlock(&_threads_mutex);
108     // @@@ --------> SECTION CRITIQUE <-------- @@@
109
110     cancel(thread_id);
111   }
112
113   // Methode pour le controle des jobs : suspend un job en file d'attente
114   void BatchManager_Local::holdJob(const JobId & jobid)
115   {
116     Id id;
117     istringstream iss(jobid.getReference());
118     iss >> id;
119
120     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
121
122     // On introduit une commande dans la queue du thread
123     // @@@ --------> SECTION CRITIQUE <-------- @@@
124     pthread_mutex_lock(&_threads_mutex);
125     if (_threads.find(id) != _threads.end()) 
126       _threads[id].command_queue.push(HOLD);
127     pthread_mutex_unlock(&_threads_mutex);
128     // @@@ --------> SECTION CRITIQUE <-------- @@@
129   }
130
131   // Methode pour le controle des jobs : relache un job suspendu
132   void BatchManager_Local::releaseJob(const JobId & jobid)
133   {
134     Id id;
135     istringstream iss(jobid.getReference());
136     iss >> id;
137
138     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
139
140     // On introduit une commande dans la queue du thread
141     // @@@ --------> SECTION CRITIQUE <-------- @@@
142     pthread_mutex_lock(&_threads_mutex);
143     if (_threads.find(id) != _threads.end()) 
144       _threads[id].command_queue.push(RELEASE);
145     pthread_mutex_unlock(&_threads_mutex);
146     // @@@ --------> SECTION CRITIQUE <-------- @@@
147   }
148
149
150   // Methode pour le controle des jobs : modifie un job en file d'attente
151   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
152   {
153   }
154
155   // Methode pour le controle des jobs : modifie un job en file d'attente
156   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
157   {
158     alterJob(jobid, param, Environnement());
159   }
160
161   // Methode pour le controle des jobs : modifie un job en file d'attente
162   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
163   {
164     alterJob(jobid, Parametre(), env);
165   }
166
167
168
169   // Methode pour le controle des jobs : renvoie l'etat du job
170   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
171   {
172     Id id;
173     istringstream iss(jobid.getReference());
174     iss >> id;
175
176     Parametre param;
177     Environnement env;
178
179     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
180     // @@@ --------> SECTION CRITIQUE <-------- @@@
181     pthread_mutex_lock(&_threads_mutex);
182     param = _threads[id].param;
183     env   = _threads[id].env;
184     pthread_mutex_unlock(&_threads_mutex);
185     // @@@ --------> SECTION CRITIQUE <-------- @@@
186     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
187
188     JobInfo_Local ji(param, env);
189     return ji;
190   }
191
192
193
194   // Methode pour le controle des jobs : teste si un job est present en machine
195   bool BatchManager_Local::isRunning(const JobId & jobid)
196   {
197     Id id;
198     istringstream iss(jobid.getReference());
199     iss >> id;
200
201     Status status;
202
203     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
204     // @@@ --------> SECTION CRITIQUE <-------- @@@
205     pthread_mutex_lock(&_threads_mutex);
206     status = _threads[id].status;
207     pthread_mutex_unlock(&_threads_mutex);
208     // @@@ --------> SECTION CRITIQUE <-------- @@@
209     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
210
211     return (status == RUNNING);
212   }
213
214
215   // Methode d'execution d'un job
216   pthread_t BatchManager_Local::submit(const Job_Local & job)
217   {
218     // L'id du thread a creer
219     pthread_t thread_id = 
220 #ifdef WIN32
221     {0,0};
222 #else
223       0;
224 #endif
225
226     // Les attributs du thread a sa creation
227     pthread_attr_t thread_attr;
228     pthread_attr_init(&thread_attr);
229     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
230
231     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
232
233     // Creation du thread qui va executer la commande systeme qu'on lui passe
234     int rc = pthread_create(&thread_id, 
235       &thread_attr, 
236       &ThreadAdapter::run, 
237       static_cast<void *>(p_ta));
238     if (rc) {
239     }
240
241     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
242     pthread_attr_destroy(&thread_attr);
243
244     return thread_id;
245   }
246
247
248   // Methode de destruction d'un job
249   void BatchManager_Local::cancel(pthread_t thread_id)
250   {
251     pthread_cancel(thread_id);
252   }
253
254
255   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique 
256   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
257   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
258   // Thread_id / Id (_thread_id_id_association_mutex)
259   BatchManager_Local::Id BatchManager_Local::nextId() 
260   {
261     static Id id = 0;
262     Id nextId = id++;
263     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
264     return nextId;
265   }
266
267
268   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
269   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
270   {
271     Id id = -1;
272
273     // @@@ --------> SECTION CRITIQUE <-------- @@@
274     pthread_mutex_lock(&_thread_id_id_association_mutex);
275 #ifndef WIN32 //TODO: porting of following functionality
276     while (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end()) 
277       pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
278
279     id = _thread_id_id_association[thread_id];
280     _thread_id_id_association.erase(thread_id);
281 #endif
282
283     pthread_mutex_unlock(&_thread_id_id_association_mutex);
284     // @@@ --------> SECTION CRITIQUE <-------- @@@
285
286     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
287     return id;
288   }
289
290
291   // Associe un Thread_id a un Id nouvellement cree
292   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) 
293   {
294     Id id = -1;
295
296     // @@@ --------> SECTION CRITIQUE <-------- @@@
297     pthread_mutex_lock(&_thread_id_id_association_mutex);
298 #ifndef WIN32 //TODO: porting of following functionality
299     if (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end()) {
300       id = _thread_id_id_association[thread_id] = nextId();
301       pthread_cond_signal(&_thread_id_id_association_cond);
302
303     } else {
304       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
305     }
306 #endif
307     pthread_mutex_unlock(&_thread_id_id_association_mutex);
308     // @@@ --------> SECTION CRITIQUE <-------- @@@
309
310     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
311     return id;
312   }
313
314
315   // Constructeur de la classe ThreadAdapter
316   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
317   _bm(bm), _job(job)
318   {
319     // Nothing to do
320   }
321
322
323
324   // Methode d'execution du thread
325   void * BatchManager_Local::ThreadAdapter::run(void * arg)
326   {
327 #ifndef WIN32 //TODO: porting of following functionality
328     // On bloque tous les signaux pour ce thread
329     sigset_t setmask;
330     sigfillset(&setmask);
331     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
332
333     // On autorise la terminaison differee du thread
334     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
335     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
336     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
337
338     // On enregistre la fonction de suppression du fils en cas d'arret du thread
339     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
340     // sera prise en compte par pthread_testcancel()
341     pid_t child;
342     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
343     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
344
345     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
346
347
348
349
350     // Le code retour cumule (ORed) de tous les appels
351     // Nul en cas de reussite de l'ensemble des operations
352     int rc = 0;
353
354     // Cette table contient la liste des fichiers a detruire a la fin du processus
355     std::vector<string> files_to_delete;
356
357
358
359     // On copie les fichiers d'entree pour le fils
360     const Parametre param   = p_ta->_job.getParametre();
361     Parametre::const_iterator it;
362
363     // On initialise la variable workdir a la valeur du Current Working Directory
364     char * cwd = 
365 #ifdef WIN32
366       _getcwd(NULL, 0);
367 #else
368       new char [PATH_MAX];
369     getcwd(cwd, PATH_MAX);
370 #endif
371     string workdir = cwd;
372     delete [] cwd;
373
374     if ( (it = param.find(WORKDIR)) != param.end() ) {
375       workdir = static_cast<string>( (*it).second );
376     }
377
378     string executionhost = string(param[EXECUTIONHOST]);
379
380     if ( (it = param.find(INFILE)) != param.end() ) {
381       Versatile V = (*it).second;
382       Versatile::iterator Vit;
383
384       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
385         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
386         Couple cp       = cpt;
387         string local    = cp.getLocal();
388         string remote   = cp.getRemote();
389
390         string copy_cmd = p_ta->getBatchManager().copy_command("", local, executionhost, workdir + "/" + remote);
391         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
392
393         if (system(copy_cmd.c_str()) ) {
394           // Echec de la copie
395           rc |= 1;
396         } else {
397           // On enregistre le fichier comme etant a detruire
398           files_to_delete.push_back(workdir + "/" + remote);
399         }
400
401       }
402     }
403
404
405
406
407 #ifdef WIN32
408     //TODO
409     //Using CreateThread instead fork() POSIX function
410 #else
411     // On forke/exec un nouveau process pour pouvoir controler le fils
412     // (plus finement qu'avec un appel system)
413     // int rc = system(commande.c_str());
414     child = fork();
415     if (child < 0) { // erreur
416       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
417
418     } else if (child > 0) { // pere
419       p_ta->pere(child);
420
421     } else { // fils
422       p_ta->fils();
423     }
424 #endif
425
426
427
428     // On copie les fichiers de sortie du fils
429     if ( (it = param.find(OUTFILE)) != param.end() ) {
430       Versatile V = (*it).second;
431       Versatile::iterator Vit;
432
433       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
434         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
435         Couple cp       = cpt;
436         string local    = cp.getLocal();
437         string remote   = cp.getRemote();
438
439         string copy_cmd = p_ta->getBatchManager().copy_command(executionhost, workdir + "/" + remote, "", local);
440         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
441
442         if (system(copy_cmd.c_str()) ) {
443           // Echec de la copie
444           rc |= 1;
445         } else {
446           // On enregistre le fichier comme etant a detruire
447           files_to_delete.push_back(workdir + "/" + remote);
448         }
449
450       }
451     }
452
453
454
455
456     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
457     // ou si la creation du fils n'a pu avoir lieu
458     if ( (rc == 0) || (child < 0) ) {
459       std::vector<string>::const_iterator it;
460       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
461         string remove_cmd = p_ta->getBatchManager().remove_command(executionhost, *it);
462         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
463         system(remove_cmd.c_str());
464       }
465     }
466
467
468
469     // On retire la fonction de nettoyage de la memoire
470     pthread_cleanup_pop(0);
471
472     // On retire la fonction de suppression du fils
473     pthread_cleanup_pop(0);
474
475
476
477     // On invoque la fonction de nettoyage de la memoire
478     delete_on_exit(arg);
479
480     UNDER_LOCK( cout << "Father is leaving" << endl );
481     pthread_exit(NULL);
482 #endif
483     return NULL;
484   }
485
486
487
488
489   void BatchManager_Local::ThreadAdapter::pere(pid_t child)
490   {
491 #ifndef WIN32 //TODO: porting of following functionality
492     time_t child_starttime = time(NULL);
493
494     // On enregistre le fils dans la table des threads
495     pthread_t thread_id = pthread_self();
496     Id id = _bm.registerThread_id(thread_id);
497
498     Parametre param   = _job.getParametre();
499     Environnement env = _job.getEnvironnement();
500
501     ostringstream thread_id_sst;
502     thread_id_sst << id;
503     param[ID]         = thread_id_sst.str();
504     param[STATE]      = "Running";
505     param[PID]        = child;
506
507     // @@@ --------> SECTION CRITIQUE <-------- @@@
508     pthread_mutex_lock(&_bm._threads_mutex);
509     _bm._threads[id].thread_id = thread_id;
510     _bm._threads[id].pid       = child;
511     _bm._threads[id].status    = RUNNING;
512     _bm._threads[id].param     = param;
513     _bm._threads[id].env       = env;
514     _bm._threads[id].command_queue.push(NOP);
515     pthread_mutex_unlock(&_bm._threads_mutex);
516     // @@@ --------> SECTION CRITIQUE <-------- @@@
517
518
519
520
521
522     // on boucle en attendant que le fils ait termine
523     while (1) {
524       int child_rc = 0;
525       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
526       if (child_wait_rc > 0) {
527         if (WIFSTOPPED(child_rc)) {
528           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED 
529           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment 
530           // desactive car s'il est possible de detecter l'arret d'un process, il est 
531           // plus difficile de detecter sa reprise.
532
533           // Le fils est simplement stoppe
534           // @@@ --------> SECTION CRITIQUE <-------- @@@
535           pthread_mutex_lock(&_bm._threads_mutex);
536           _bm._threads[id].status       = STOPPED;
537           _bm._threads[id].param[STATE] = "Stopped";
538           pthread_mutex_unlock(&_bm._threads_mutex);
539           // @@@ --------> SECTION CRITIQUE <-------- @@@
540           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
541
542         } 
543         else {
544           // Le fils est termine, on sort de la boucle et du if englobant
545           // @@@ --------> SECTION CRITIQUE <-------- @@@
546           pthread_mutex_lock(&_bm._threads_mutex);
547           _bm._threads[id].status       = DONE;
548           _bm._threads[id].param[STATE] = "Done";
549           pthread_mutex_unlock(&_bm._threads_mutex);
550           // @@@ --------> SECTION CRITIQUE <-------- @@@
551           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
552           break;
553         }
554       }
555       else if (child_wait_rc == -1) {
556         // Le fils a disparu ...
557         // @@@ --------> SECTION CRITIQUE <-------- @@@
558         pthread_mutex_lock(&_bm._threads_mutex);
559         _bm._threads[id].status       = DEAD;
560         _bm._threads[id].param[STATE] = "Dead";
561         pthread_mutex_unlock(&_bm._threads_mutex);
562         // @@@ --------> SECTION CRITIQUE <-------- @@@
563         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
564         break;
565       }
566
567
568
569       // On teste si le thread doit etre detruit
570       pthread_testcancel();
571
572
573
574       // On regarde si le fils n'a pas depasse son temps (wallclock time)
575       time_t child_currenttime = time(NULL);
576       time_t child_elapsedtime = child_currenttime - child_starttime;
577       if (param.find(MAXWALLTIME) != param.end()) {
578         int maxwalltime = param[MAXWALLTIME];
579         //        cout << "child_starttime          = " << child_starttime        << endl
580         //             << "child_currenttime        = " << child_currenttime      << endl
581         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
582         //             << "maxwalltime              = " << maxwalltime            << endl
583         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
584         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
585           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
586           // On introduit une commande dans la queue du thread
587           // @@@ --------> SECTION CRITIQUE <-------- @@@
588           pthread_mutex_lock(&_bm._threads_mutex);
589           if (_bm._threads.find(id) != _bm._threads.end()) 
590             _bm._threads[id].command_queue.push(KILL);
591           pthread_mutex_unlock(&_bm._threads_mutex);
592           // @@@ --------> SECTION CRITIQUE <-------- @@@
593
594
595         } else if (child_elapsedtime > maxwalltime ) {
596           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
597           // On introduit une commande dans la queue du thread
598           // @@@ --------> SECTION CRITIQUE <-------- @@@
599           pthread_mutex_lock(&_bm._threads_mutex);
600           if (_bm._threads.find(id) != _bm._threads.end()) 
601             _bm._threads[id].command_queue.push(TERM);
602           pthread_mutex_unlock(&_bm._threads_mutex);
603           // @@@ --------> SECTION CRITIQUE <-------- @@@
604         }
605       }
606
607
608
609       // On regarde s'il y a quelque chose a faire dans la queue de commande
610       // @@@ --------> SECTION CRITIQUE <-------- @@@
611       pthread_mutex_lock(&_bm._threads_mutex);
612       if (_bm._threads.find(id) != _bm._threads.end()) {
613         while (_bm._threads[id].command_queue.size() > 0) {
614           Commande cmd = _bm._threads[id].command_queue.front();
615           _bm._threads[id].command_queue.pop();
616
617           switch (cmd) {
618     case NOP:
619       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
620       break;
621
622     case HOLD:
623       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
624       kill(child, SIGSTOP);
625       break;
626
627     case RELEASE:
628       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
629       kill(child, SIGCONT);
630       break;
631
632     case TERM:
633       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
634       kill(child, SIGTERM);
635       break;
636
637     case KILL:
638       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
639       kill(child, SIGKILL);
640       break;
641
642     case ALTER:
643       break;
644
645     default:
646       break;
647           }
648         }
649
650       }
651       pthread_mutex_unlock(&_bm._threads_mutex);
652       // @@@ --------> SECTION CRITIQUE <-------- @@@
653
654       // On fait une petite pause pour ne pas surcharger inutilement le processeur
655       sleep(1);
656
657     }
658 #endif
659
660
661   }
662
663
664
665
666   void BatchManager_Local::ThreadAdapter::fils()
667   {
668 #ifndef WIN32 //TODO: porting of following functionality
669     Parametre param = _job.getParametre();
670     Parametre::iterator it;
671
672     try {
673
674       // On se place dans le repertoire de travail
675       if ( (it = param.find(WORKDIR)) != param.end() ) {
676         string workdir = static_cast<string>( (*it).second );
677         chdir(workdir.c_str());
678       }
679
680
681
682
683       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
684       char * execpath = NULL;
685       if (param.find(EXECUTABLE) != param.end()) {
686         string executable = _bm.exec_command(param);
687         execpath          = new char [executable.size() + 1];
688         strncpy(execpath, executable.c_str(), executable.size() + 1);
689       } else exit(1); 
690
691       string debug_command = execpath;
692
693       string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
694
695       char **  argv = NULL;
696       if (param.find(ARGUMENTS) != param.end()) {
697         Versatile V = param[ARGUMENTS];
698
699         argv = new char * [V.size() + 2]; // 1 pour name et 1 pour le NULL terminal
700
701         argv[0] = new char [name.size() + 1];
702         strncpy(argv[0], name.c_str(), name.size() + 1);
703
704         debug_command  += string(" # ") + argv[0];
705
706         int i = 1;
707         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++, i++) {
708           StringType argt = * static_cast<StringType *>(*it);
709           string     arg  = argt;
710           argv[i]         = new char [arg.size() + 1];
711           strncpy(argv[i], arg.c_str(), arg.size() + 1);
712           debug_command  += string(" # ") + argv[i];
713         }
714
715         // assert (i == V.size() + 1)
716         argv[i] = NULL;
717       }
718
719
720       UNDER_LOCK( cout << "*** debug_command = " << debug_command << endl );
721
722
723
724       Environnement env = _job.getEnvironnement();
725
726
727       char ** envp = NULL;
728       if(env.size() > 0) {
729         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
730         int i = 0;
731         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
732           const string  & key   = (*it).first;
733           const string  & value = (*it).second;
734           ostringstream oss;
735           oss << key << "=" << value;
736           envp[i]         = new char [oss.str().size() + 1];
737           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
738         }
739
740         // assert (i == env.size())
741         envp[i] = NULL;
742       }
743
744
745
746
747       // On positionne les limites systeme imposees au fils
748       if (param.find(MAXCPUTIME) != param.end()) {
749         int maxcputime = param[MAXCPUTIME];
750         struct rlimit limit;
751         limit.rlim_cur = maxcputime;
752         limit.rlim_max = int(maxcputime * 1.1);
753         setrlimit(RLIMIT_CPU, &limit);
754       }
755
756       if (param.find(MAXDISKSIZE) != param.end()) {
757         int maxdisksize = param[MAXDISKSIZE];
758         struct rlimit limit;
759         limit.rlim_cur = maxdisksize * 1024;
760         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
761         setrlimit(RLIMIT_FSIZE, &limit);
762       }
763
764       if (param.find(MAXRAMSIZE) != param.end()) {
765         int maxramsize = param[MAXRAMSIZE];
766         struct rlimit limit;
767         limit.rlim_cur = maxramsize * 1024;
768         limit.rlim_max = int(maxramsize * 1.1) * 1024;
769         setrlimit(RLIMIT_AS, &limit);
770       }
771
772
773
774       // On cree une session pour le fils de facon a ce qu'il ne soit pas
775       // detruit lorsque le shell se termine (le shell ouvre une session et
776       // tue tous les process appartenant a la session en quittant)
777       setsid();
778
779
780       // On ferme les descripteurs de fichiers standards
781       //close(STDIN_FILENO);
782       //close(STDOUT_FILENO);
783       //close(STDERR_FILENO);
784
785
786       // On execute la commande du fils
787       execve(execpath, argv, envp);
788
789       // No need to deallocate since nothing happens after a successful exec
790
791       // Normalement on ne devrait jamais arriver ici    
792       ofstream file_err("error.log");
793       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
794
795     } catch (GenericException & e) {
796
797       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
798     }
799
800     exit(99);
801 #endif
802   }
803
804
805
806
807   void BatchManager_Local::kill_child_on_exit(void * p_pid)
808   {
809 #ifndef WIN32 
810     //TODO: porting of following functionality
811     pid_t child = * static_cast<pid_t *>(p_pid);
812
813     // On tue le fils
814     kill(child, SIGTERM);
815
816     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
817     // mais cette option n'est pas implementee pour le moment, car il est 
818     // preferable de laisser le process fils se terminer normalement et seul.
819 #endif
820   }
821
822   void BatchManager_Local::delete_on_exit(void * arg)
823   {
824     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
825     delete p_ta;
826   }
827
828 }