Salome HOME
Initial import: adding files
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 */
31
32 #include <iostream>
33 #include <fstream>
34 #include <sstream>
35 #include <cstdlib>
36 #include <limits.h>
37
38 #include <sys/types.h>
39 #ifdef WIN32
40 # include <direct.h>
41 #else
42 # include <sys/wait.h>
43 # include <unistd.h>
44 #endif
45 #include <ctime>
46 #include <pthread.h>
47 #include <signal.h>
48 #include <errno.h>
49 #include <string.h>
50 #include "Batch_IOMutex.hxx"
51 #include "Batch_BatchManager_Local.hxx"
52
53 using namespace std;
54
55 namespace Batch {
56
57
58   // Constructeur
59   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host) throw(InvalidArgumentException,ConnexionFailureException) : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), _thread_id_id_association_mutex(), _thread_id_id_association_cond()
60 #ifndef WIN32 //TODO: porting of following functionality
61     ,_thread_id_id_association()
62 #endif
63   {
64     pthread_mutex_init(&_threads_mutex, NULL);
65     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
66     pthread_cond_init(&_thread_id_id_association_cond, NULL);
67   }
68
69   // Destructeur
70   BatchManager_Local::~BatchManager_Local()
71   {
72     pthread_mutex_destroy(&_threads_mutex);
73     pthread_mutex_destroy(&_thread_id_id_association_mutex);
74     pthread_cond_destroy(&_thread_id_id_association_cond);
75   }
76
77   // Methode pour le controle des jobs : soumet un job au gestionnaire
78   const JobId BatchManager_Local::submitJob(const Job & job)
79   {
80     Job_Local jobLocal = job;
81
82     pthread_t thread_id = submit(jobLocal);
83
84     ostringstream oss;
85     oss << getIdByThread_id(thread_id);
86
87     JobId id(this, oss.str());
88
89     return id;
90   }
91
92   // Methode pour le controle des jobs : retire un job du gestionnaire
93   void BatchManager_Local::deleteJob(const JobId & jobid)
94   {
95     Id id;
96
97     istringstream iss(jobid.getReference());
98     iss >> id;
99
100     // On retrouve le thread_id du thread
101     pthread_t thread_id;
102
103     // @@@ --------> SECTION CRITIQUE <-------- @@@
104     pthread_mutex_lock(&_threads_mutex);
105     if (_threads.find(id) != _threads.end())
106       thread_id = _threads[id].thread_id;
107     pthread_mutex_unlock(&_threads_mutex);
108     // @@@ --------> SECTION CRITIQUE <-------- @@@
109
110     cancel(thread_id);
111   }
112
113   // Methode pour le controle des jobs : suspend un job en file d'attente
114   void BatchManager_Local::holdJob(const JobId & jobid)
115   {
116     Id id;
117     istringstream iss(jobid.getReference());
118     iss >> id;
119
120     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
121
122     // On introduit une commande dans la queue du thread
123     // @@@ --------> SECTION CRITIQUE <-------- @@@
124     pthread_mutex_lock(&_threads_mutex);
125     if (_threads.find(id) != _threads.end())
126       _threads[id].command_queue.push(HOLD);
127     pthread_mutex_unlock(&_threads_mutex);
128     // @@@ --------> SECTION CRITIQUE <-------- @@@
129   }
130
131   // Methode pour le controle des jobs : relache un job suspendu
132   void BatchManager_Local::releaseJob(const JobId & jobid)
133   {
134     Id id;
135     istringstream iss(jobid.getReference());
136     iss >> id;
137
138     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
139
140     // On introduit une commande dans la queue du thread
141     // @@@ --------> SECTION CRITIQUE <-------- @@@
142     pthread_mutex_lock(&_threads_mutex);
143     if (_threads.find(id) != _threads.end())
144       _threads[id].command_queue.push(RELEASE);
145     pthread_mutex_unlock(&_threads_mutex);
146     // @@@ --------> SECTION CRITIQUE <-------- @@@
147   }
148
149
150   // Methode pour le controle des jobs : modifie un job en file d'attente
151   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
152   {
153   }
154
155   // Methode pour le controle des jobs : modifie un job en file d'attente
156   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
157   {
158     alterJob(jobid, param, Environnement());
159   }
160
161   // Methode pour le controle des jobs : modifie un job en file d'attente
162   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
163   {
164     alterJob(jobid, Parametre(), env);
165   }
166
167
168
169   // Methode pour le controle des jobs : renvoie l'etat du job
170   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
171   {
172     Id id;
173     istringstream iss(jobid.getReference());
174     iss >> id;
175
176     Parametre param;
177     Environnement env;
178
179     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
180     // @@@ --------> SECTION CRITIQUE <-------- @@@
181     pthread_mutex_lock(&_threads_mutex);
182     param = _threads[id].param;
183     env   = _threads[id].env;
184     pthread_mutex_unlock(&_threads_mutex);
185     // @@@ --------> SECTION CRITIQUE <-------- @@@
186     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
187
188     JobInfo_Local ji(param, env);
189     return ji;
190   }
191
192
193
194   // Methode pour le controle des jobs : teste si un job est present en machine
195   bool BatchManager_Local::isRunning(const JobId & jobid)
196   {
197     Id id;
198     istringstream iss(jobid.getReference());
199     iss >> id;
200
201     Status status;
202
203     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
204     // @@@ --------> SECTION CRITIQUE <-------- @@@
205     pthread_mutex_lock(&_threads_mutex);
206     status = _threads[id].status;
207     pthread_mutex_unlock(&_threads_mutex);
208     // @@@ --------> SECTION CRITIQUE <-------- @@@
209     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
210
211     return (status == RUNNING);
212   }
213
214
215   // Methode d'execution d'un job
216   pthread_t BatchManager_Local::submit(const Job_Local & job)
217   {
218     // L'id du thread a creer
219     pthread_t thread_id =
220 #ifdef WIN32
221     {0,0};
222 #else
223       0;
224 #endif
225
226     // Les attributs du thread a sa creation
227     pthread_attr_t thread_attr;
228     pthread_attr_init(&thread_attr);
229     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
230
231     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
232
233     // Creation du thread qui va executer la commande systeme qu'on lui passe
234     int rc = pthread_create(&thread_id,
235       &thread_attr,
236       &ThreadAdapter::run,
237       static_cast<void *>(p_ta));
238     if (rc) {
239     }
240
241     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
242     pthread_attr_destroy(&thread_attr);
243
244     return thread_id;
245   }
246
247
248   // Methode de destruction d'un job
249   void BatchManager_Local::cancel(pthread_t thread_id)
250   {
251     pthread_cancel(thread_id);
252   }
253
254
255   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
256   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
257   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
258   // Thread_id / Id (_thread_id_id_association_mutex)
259   BatchManager_Local::Id BatchManager_Local::nextId()
260   {
261     static Id id = 0;
262     Id nextId = id++;
263     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
264     return nextId;
265   }
266
267
268   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
269   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
270   {
271     Id id = -1;
272
273     // @@@ --------> SECTION CRITIQUE <-------- @@@
274     pthread_mutex_lock(&_thread_id_id_association_mutex);
275 #ifndef WIN32 //TODO: porting of following functionality
276     while (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end())
277       pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
278
279     id = _thread_id_id_association[thread_id];
280     _thread_id_id_association.erase(thread_id);
281 #endif
282
283     pthread_mutex_unlock(&_thread_id_id_association_mutex);
284     // @@@ --------> SECTION CRITIQUE <-------- @@@
285
286     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
287     return id;
288   }
289
290
291   // Associe un Thread_id a un Id nouvellement cree
292   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
293   {
294     Id id = -1;
295
296     // @@@ --------> SECTION CRITIQUE <-------- @@@
297     pthread_mutex_lock(&_thread_id_id_association_mutex);
298 #ifndef WIN32 //TODO: porting of following functionality
299     if (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end()) {
300       id = _thread_id_id_association[thread_id] = nextId();
301       pthread_cond_signal(&_thread_id_id_association_cond);
302
303     } else {
304       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
305     }
306 #endif
307     pthread_mutex_unlock(&_thread_id_id_association_mutex);
308     // @@@ --------> SECTION CRITIQUE <-------- @@@
309
310     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
311     return id;
312   }
313
314
315   // Constructeur de la classe ThreadAdapter
316   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
317   _bm(bm), _job(job)
318   {
319     // Nothing to do
320   }
321
322
323
324   // Methode d'execution du thread
325   void * BatchManager_Local::ThreadAdapter::run(void * arg)
326   {
327 #ifndef WIN32 //TODO: porting of following functionality
328     // On bloque tous les signaux pour ce thread
329     sigset_t setmask;
330     sigfillset(&setmask);
331     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
332
333     // On autorise la terminaison differee du thread
334     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
335     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
336     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
337
338     // On enregistre la fonction de suppression du fils en cas d'arret du thread
339     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
340     // sera prise en compte par pthread_testcancel()
341     pid_t child;
342     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
343     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
344
345     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
346
347
348
349
350     // Le code retour cumule (ORed) de tous les appels
351     // Nul en cas de reussite de l'ensemble des operations
352     int rc = 0;
353
354     // Cette table contient la liste des fichiers a detruire a la fin du processus
355     std::vector<string> files_to_delete;
356
357
358
359     // On copie les fichiers d'entree pour le fils
360     const Parametre param   = p_ta->_job.getParametre();
361     Parametre::const_iterator it;
362
363     // On initialise la variable workdir a la valeur du Current Working Directory
364     char * cwd =
365 #ifdef WIN32
366       _getcwd(NULL, 0);
367 #else
368       new char [PATH_MAX];
369     getcwd(cwd, PATH_MAX);
370 #endif
371     string workdir = cwd;
372     delete [] cwd;
373
374     if ( (it = param.find(WORKDIR)) != param.end() ) {
375       workdir = static_cast<string>( (*it).second );
376     }
377
378     string executionhost = string(param[EXECUTIONHOST]);
379
380     if ( (it = param.find(INFILE)) != param.end() ) {
381       Versatile V = (*it).second;
382       Versatile::iterator Vit;
383
384       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
385         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
386         Couple cp       = cpt;
387         string local    = cp.getLocal();
388         string remote   = cp.getRemote();
389
390         string copy_cmd = p_ta->getBatchManager().copy_command("", local, executionhost, workdir + "/" + remote);
391         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
392
393         if (system(copy_cmd.c_str()) ) {
394           // Echec de la copie
395           rc |= 1;
396         } else {
397           // On enregistre le fichier comme etant a detruire
398           files_to_delete.push_back(workdir + "/" + remote);
399         }
400
401       }
402     }
403
404
405
406
407 #ifdef WIN32
408     //TODO
409     //Using CreateThread instead fork() POSIX function
410 #else
411     // On forke/exec un nouveau process pour pouvoir controler le fils
412     // (plus finement qu'avec un appel system)
413     // int rc = system(commande.c_str());
414     child = fork();
415     if (child < 0) { // erreur
416       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
417
418     } else if (child > 0) { // pere
419       p_ta->pere(child);
420
421     } else { // fils
422       p_ta->fils();
423     }
424 #endif
425
426
427
428     // On copie les fichiers de sortie du fils
429     if ( (it = param.find(OUTFILE)) != param.end() ) {
430       Versatile V = (*it).second;
431       Versatile::iterator Vit;
432
433       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
434         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
435         Couple cp       = cpt;
436         string local    = cp.getLocal();
437         string remote   = cp.getRemote();
438
439         string copy_cmd = p_ta->getBatchManager().copy_command(executionhost, workdir + "/" + remote, "", local);
440         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
441
442         if (system(copy_cmd.c_str()) ) {
443           // Echec de la copie
444           rc |= 1;
445         } else {
446           // On enregistre le fichier comme etant a detruire
447           files_to_delete.push_back(workdir + "/" + remote);
448         }
449
450       }
451     }
452
453     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
454     // ou si la creation du fils n'a pu avoir lieu
455     if ( (rc == 0) || (child < 0) ) {
456       std::vector<string>::const_iterator it;
457       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
458         string remove_cmd = p_ta->getBatchManager().remove_command(executionhost, *it);
459         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
460         system(remove_cmd.c_str());
461       }
462     }
463
464
465
466     // On retire la fonction de nettoyage de la memoire
467     pthread_cleanup_pop(0);
468
469     // On retire la fonction de suppression du fils
470     pthread_cleanup_pop(0);
471
472
473
474     // On invoque la fonction de nettoyage de la memoire
475     delete_on_exit(arg);
476
477     UNDER_LOCK( cout << "Father is leaving" << endl );
478     pthread_exit(NULL);
479 #endif
480     return NULL;
481   }
482
483
484
485
486   void BatchManager_Local::ThreadAdapter::pere(pid_t child)
487   {
488 #ifndef WIN32 //TODO: porting of following functionality
489     time_t child_starttime = time(NULL);
490
491     // On enregistre le fils dans la table des threads
492     pthread_t thread_id = pthread_self();
493     Id id = _bm.registerThread_id(thread_id);
494
495     Parametre param   = _job.getParametre();
496     Environnement env = _job.getEnvironnement();
497
498     ostringstream thread_id_sst;
499     thread_id_sst << id;
500     param[ID]         = thread_id_sst.str();
501     param[STATE]      = "Running";
502     param[PID]        = child;
503
504     // @@@ --------> SECTION CRITIQUE <-------- @@@
505     pthread_mutex_lock(&_bm._threads_mutex);
506     _bm._threads[id].thread_id = thread_id;
507     _bm._threads[id].pid       = child;
508     _bm._threads[id].status    = RUNNING;
509     _bm._threads[id].param     = param;
510     _bm._threads[id].env       = env;
511     _bm._threads[id].command_queue.push(NOP);
512     pthread_mutex_unlock(&_bm._threads_mutex);
513     // @@@ --------> SECTION CRITIQUE <-------- @@@
514
515
516
517
518
519     // on boucle en attendant que le fils ait termine
520     while (1) {
521       int child_rc = 0;
522       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
523       if (child_wait_rc > 0) {
524         if (WIFSTOPPED(child_rc)) {
525           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
526           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
527           // desactive car s'il est possible de detecter l'arret d'un process, il est
528           // plus difficile de detecter sa reprise.
529
530           // Le fils est simplement stoppe
531           // @@@ --------> SECTION CRITIQUE <-------- @@@
532           pthread_mutex_lock(&_bm._threads_mutex);
533           _bm._threads[id].status       = STOPPED;
534           _bm._threads[id].param[STATE] = "Stopped";
535           pthread_mutex_unlock(&_bm._threads_mutex);
536           // @@@ --------> SECTION CRITIQUE <-------- @@@
537           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
538
539         }
540         else {
541           // Le fils est termine, on sort de la boucle et du if englobant
542           // @@@ --------> SECTION CRITIQUE <-------- @@@
543           pthread_mutex_lock(&_bm._threads_mutex);
544           _bm._threads[id].status       = DONE;
545           _bm._threads[id].param[STATE] = "Done";
546           pthread_mutex_unlock(&_bm._threads_mutex);
547           // @@@ --------> SECTION CRITIQUE <-------- @@@
548           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
549           break;
550         }
551       }
552       else if (child_wait_rc == -1) {
553         // Le fils a disparu ...
554         // @@@ --------> SECTION CRITIQUE <-------- @@@
555         pthread_mutex_lock(&_bm._threads_mutex);
556         _bm._threads[id].status       = DEAD;
557         _bm._threads[id].param[STATE] = "Dead";
558         pthread_mutex_unlock(&_bm._threads_mutex);
559         // @@@ --------> SECTION CRITIQUE <-------- @@@
560         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
561         break;
562       }
563
564
565
566       // On teste si le thread doit etre detruit
567       pthread_testcancel();
568
569
570
571       // On regarde si le fils n'a pas depasse son temps (wallclock time)
572       time_t child_currenttime = time(NULL);
573       time_t child_elapsedtime = child_currenttime - child_starttime;
574       if (param.find(MAXWALLTIME) != param.end()) {
575         int maxwalltime = param[MAXWALLTIME];
576         //        cout << "child_starttime          = " << child_starttime        << endl
577         //             << "child_currenttime        = " << child_currenttime      << endl
578         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
579         //             << "maxwalltime              = " << maxwalltime            << endl
580         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
581         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
582           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
583           // On introduit une commande dans la queue du thread
584           // @@@ --------> SECTION CRITIQUE <-------- @@@
585           pthread_mutex_lock(&_bm._threads_mutex);
586           if (_bm._threads.find(id) != _bm._threads.end())
587             _bm._threads[id].command_queue.push(KILL);
588           pthread_mutex_unlock(&_bm._threads_mutex);
589           // @@@ --------> SECTION CRITIQUE <-------- @@@
590
591
592         } else if (child_elapsedtime > maxwalltime ) {
593           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
594           // On introduit une commande dans la queue du thread
595           // @@@ --------> SECTION CRITIQUE <-------- @@@
596           pthread_mutex_lock(&_bm._threads_mutex);
597           if (_bm._threads.find(id) != _bm._threads.end())
598             _bm._threads[id].command_queue.push(TERM);
599           pthread_mutex_unlock(&_bm._threads_mutex);
600           // @@@ --------> SECTION CRITIQUE <-------- @@@
601         }
602       }
603
604
605
606       // On regarde s'il y a quelque chose a faire dans la queue de commande
607       // @@@ --------> SECTION CRITIQUE <-------- @@@
608       pthread_mutex_lock(&_bm._threads_mutex);
609       if (_bm._threads.find(id) != _bm._threads.end()) {
610         while (_bm._threads[id].command_queue.size() > 0) {
611           Commande cmd = _bm._threads[id].command_queue.front();
612           _bm._threads[id].command_queue.pop();
613
614           switch (cmd) {
615     case NOP:
616       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
617       break;
618
619     case HOLD:
620       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
621       kill(child, SIGSTOP);
622       break;
623
624     case RELEASE:
625       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
626       kill(child, SIGCONT);
627       break;
628
629     case TERM:
630       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
631       kill(child, SIGTERM);
632       break;
633
634     case KILL:
635       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
636       kill(child, SIGKILL);
637       break;
638
639     case ALTER:
640       break;
641
642     default:
643       break;
644           }
645         }
646
647       }
648       pthread_mutex_unlock(&_bm._threads_mutex);
649       // @@@ --------> SECTION CRITIQUE <-------- @@@
650
651       // On fait une petite pause pour ne pas surcharger inutilement le processeur
652       sleep(1);
653
654     }
655 #endif
656
657
658   }
659
660
661
662
663   void BatchManager_Local::ThreadAdapter::fils()
664   {
665 #ifndef WIN32 //TODO: porting of following functionality
666     Parametre param = _job.getParametre();
667     Parametre::iterator it;
668
669     try {
670
671       // On se place dans le repertoire de travail
672       if ( (it = param.find(WORKDIR)) != param.end() ) {
673         string workdir = static_cast<string>( (*it).second );
674         chdir(workdir.c_str());
675       }
676
677
678
679
680       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
681       char * execpath = NULL;
682       if (param.find(EXECUTABLE) != param.end()) {
683         string executable = _bm.exec_command(param);
684         execpath          = new char [executable.size() + 1];
685         strncpy(execpath, executable.c_str(), executable.size() + 1);
686       } else exit(1);
687
688       string debug_command = execpath;
689
690       string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
691
692       char **  argv = NULL;
693       if (param.find(ARGUMENTS) != param.end()) {
694         Versatile V = param[ARGUMENTS];
695
696         argv = new char * [V.size() + 2]; // 1 pour name et 1 pour le NULL terminal
697
698         argv[0] = new char [name.size() + 1];
699         strncpy(argv[0], name.c_str(), name.size() + 1);
700
701         debug_command  += string(" # ") + argv[0];
702
703         int i = 1;
704         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++, i++) {
705           StringType argt = * static_cast<StringType *>(*it);
706           string     arg  = argt;
707           argv[i]         = new char [arg.size() + 1];
708           strncpy(argv[i], arg.c_str(), arg.size() + 1);
709           debug_command  += string(" # ") + argv[i];
710         }
711
712         // assert (i == V.size() + 1)
713         argv[i] = NULL;
714       }
715
716
717       UNDER_LOCK( cout << "*** debug_command = " << debug_command << endl );
718
719
720
721       Environnement env = _job.getEnvironnement();
722
723
724       char ** envp = NULL;
725       if(env.size() > 0) {
726         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
727         int i = 0;
728         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
729           const string  & key   = (*it).first;
730           const string  & value = (*it).second;
731           ostringstream oss;
732           oss << key << "=" << value;
733           envp[i]         = new char [oss.str().size() + 1];
734           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
735         }
736
737         // assert (i == env.size())
738         envp[i] = NULL;
739       }
740
741
742
743
744       // On positionne les limites systeme imposees au fils
745       if (param.find(MAXCPUTIME) != param.end()) {
746         int maxcputime = param[MAXCPUTIME];
747         struct rlimit limit;
748         limit.rlim_cur = maxcputime;
749         limit.rlim_max = int(maxcputime * 1.1);
750         setrlimit(RLIMIT_CPU, &limit);
751       }
752
753       if (param.find(MAXDISKSIZE) != param.end()) {
754         int maxdisksize = param[MAXDISKSIZE];
755         struct rlimit limit;
756         limit.rlim_cur = maxdisksize * 1024;
757         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
758         setrlimit(RLIMIT_FSIZE, &limit);
759       }
760
761       if (param.find(MAXRAMSIZE) != param.end()) {
762         int maxramsize = param[MAXRAMSIZE];
763         struct rlimit limit;
764         limit.rlim_cur = maxramsize * 1024;
765         limit.rlim_max = int(maxramsize * 1.1) * 1024;
766         setrlimit(RLIMIT_AS, &limit);
767       }
768
769
770
771       // On cree une session pour le fils de facon a ce qu'il ne soit pas
772       // detruit lorsque le shell se termine (le shell ouvre une session et
773       // tue tous les process appartenant a la session en quittant)
774       setsid();
775
776
777       // On ferme les descripteurs de fichiers standards
778       //close(STDIN_FILENO);
779       //close(STDOUT_FILENO);
780       //close(STDERR_FILENO);
781
782
783       // On execute la commande du fils
784       execve(execpath, argv, envp);
785
786       // No need to deallocate since nothing happens after a successful exec
787
788       // Normalement on ne devrait jamais arriver ici
789       ofstream file_err("error.log");
790       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
791
792     } catch (GenericException & e) {
793
794       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
795     }
796
797     exit(99);
798 #endif
799   }
800
801
802
803
804   void BatchManager_Local::kill_child_on_exit(void * p_pid)
805   {
806 #ifndef WIN32
807     //TODO: porting of following functionality
808     pid_t child = * static_cast<pid_t *>(p_pid);
809
810     // On tue le fils
811     kill(child, SIGTERM);
812
813     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
814     // mais cette option n'est pas implementee pour le moment, car il est
815     // preferable de laisser le process fils se terminer normalement et seul.
816 #endif
817   }
818
819   void BatchManager_Local::delete_on_exit(void * arg)
820   {
821     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
822     delete p_ta;
823   }
824
825 }