]> SALOME platform Git repositories - modules/kernel.git/blob - src/Batch/Batch_BatchManager_Local.cxx
Salome HOME
Merging from V4_1_0_maintainance for porting on Win32 Platform
[modules/kernel.git] / src / Batch / Batch_BatchManager_Local.cxx
1 // Copyright (C) 2005  OPEN CASCADE, CEA, EDF R&D, LEG
2 //           PRINCIPIA R&D, EADS CCR, Lip6, BV, CEDRAT
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either 
6 // version 2.1 of the License.
7 // 
8 // This library is distributed in the hope that it will be useful 
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of 
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
11 // Lesser General Public License for more details.
12 // 
13 // You should have received a copy of the GNU Lesser General Public  
14 // License along with this library; if not, write to the Free Software 
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 // 
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 // 
19 /*
20 * BatchManager_Local.cxx : 
21 *
22 * Auteur : Ivan DUTKA-MALEN - EDF R&D
23 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
24 * Date   : Thu Nov  6 10:17:22 2003
25 * Projet : Salome 2
26 *
27 */
28
29 #include <iostream>
30 #include <fstream>
31 #include <sstream>
32 #include <cstdlib>
33 #include <sys/types.h>
34 #ifdef WIN32
35 # include <direct.h>
36 #else
37 # include <sys/wait.h>
38 # include <unistd.h>
39 #endif
40 #include <ctime>
41 #include <pthread.h>
42 #include <signal.h>
43 #include <errno.h>
44 #include <string.h>
45 #include "Batch_IOMutex.hxx"
46 #include "Batch_BatchManager_Local.hxx"
47
48 using namespace std;
49
50 namespace Batch {
51
52
53   // Constructeur
54   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host) throw(InvalidArgumentException,ConnexionFailureException) : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), _thread_id_id_association_mutex(), _thread_id_id_association_cond()
55 #ifndef WIN32 //TODO: porting of following functionality
56     ,_thread_id_id_association()
57 #endif
58   {
59     pthread_mutex_init(&_threads_mutex, NULL);
60     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
61     pthread_cond_init(&_thread_id_id_association_cond, NULL);
62   }
63
64   // Destructeur
65   BatchManager_Local::~BatchManager_Local()
66   {
67     pthread_mutex_destroy(&_threads_mutex);
68     pthread_mutex_destroy(&_thread_id_id_association_mutex);
69     pthread_cond_destroy(&_thread_id_id_association_cond);
70   }
71
72   // Methode pour le controle des jobs : soumet un job au gestionnaire
73   const JobId BatchManager_Local::submitJob(const Job & job)
74   {
75     Job_Local jobLocal = job;
76
77     pthread_t thread_id = submit(jobLocal);
78
79     ostringstream oss;
80     oss << getIdByThread_id(thread_id);
81
82     JobId id(this, oss.str());
83
84     return id;
85   }
86
87   // Methode pour le controle des jobs : retire un job du gestionnaire
88   void BatchManager_Local::deleteJob(const JobId & jobid)
89   {
90     Id id;
91
92     istringstream iss(jobid.getReference());
93     iss >> id;
94
95     // On retrouve le thread_id du thread
96     pthread_t thread_id;
97
98     // @@@ --------> SECTION CRITIQUE <-------- @@@
99     pthread_mutex_lock(&_threads_mutex);
100     if (_threads.find(id) != _threads.end()) 
101       thread_id = _threads[id].thread_id;
102     pthread_mutex_unlock(&_threads_mutex);
103     // @@@ --------> SECTION CRITIQUE <-------- @@@
104
105     cancel(thread_id);
106   }
107
108   // Methode pour le controle des jobs : suspend un job en file d'attente
109   void BatchManager_Local::holdJob(const JobId & jobid)
110   {
111     Id id;
112     istringstream iss(jobid.getReference());
113     iss >> id;
114
115     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
116
117     // On introduit une commande dans la queue du thread
118     // @@@ --------> SECTION CRITIQUE <-------- @@@
119     pthread_mutex_lock(&_threads_mutex);
120     if (_threads.find(id) != _threads.end()) 
121       _threads[id].command_queue.push(HOLD);
122     pthread_mutex_unlock(&_threads_mutex);
123     // @@@ --------> SECTION CRITIQUE <-------- @@@
124   }
125
126   // Methode pour le controle des jobs : relache un job suspendu
127   void BatchManager_Local::releaseJob(const JobId & jobid)
128   {
129     Id id;
130     istringstream iss(jobid.getReference());
131     iss >> id;
132
133     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
134
135     // On introduit une commande dans la queue du thread
136     // @@@ --------> SECTION CRITIQUE <-------- @@@
137     pthread_mutex_lock(&_threads_mutex);
138     if (_threads.find(id) != _threads.end()) 
139       _threads[id].command_queue.push(RELEASE);
140     pthread_mutex_unlock(&_threads_mutex);
141     // @@@ --------> SECTION CRITIQUE <-------- @@@
142   }
143
144
145   // Methode pour le controle des jobs : modifie un job en file d'attente
146   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
147   {
148   }
149
150   // Methode pour le controle des jobs : modifie un job en file d'attente
151   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
152   {
153     alterJob(jobid, param, Environnement());
154   }
155
156   // Methode pour le controle des jobs : modifie un job en file d'attente
157   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
158   {
159     alterJob(jobid, Parametre(), env);
160   }
161
162
163
164   // Methode pour le controle des jobs : renvoie l'etat du job
165   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
166   {
167     Id id;
168     istringstream iss(jobid.getReference());
169     iss >> id;
170
171     Parametre param;
172     Environnement env;
173
174     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
175     // @@@ --------> SECTION CRITIQUE <-------- @@@
176     pthread_mutex_lock(&_threads_mutex);
177     param = _threads[id].param;
178     env   = _threads[id].env;
179     pthread_mutex_unlock(&_threads_mutex);
180     // @@@ --------> SECTION CRITIQUE <-------- @@@
181     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
182
183     JobInfo_Local ji(param, env);
184     return ji;
185   }
186
187
188
189   // Methode pour le controle des jobs : teste si un job est present en machine
190   bool BatchManager_Local::isRunning(const JobId & jobid)
191   {
192     Id id;
193     istringstream iss(jobid.getReference());
194     iss >> id;
195
196     Status status;
197
198     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
199     // @@@ --------> SECTION CRITIQUE <-------- @@@
200     pthread_mutex_lock(&_threads_mutex);
201     status = _threads[id].status;
202     pthread_mutex_unlock(&_threads_mutex);
203     // @@@ --------> SECTION CRITIQUE <-------- @@@
204     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
205
206     return (status == RUNNING);
207   }
208
209
210   // Methode d'execution d'un job
211   pthread_t BatchManager_Local::submit(const Job_Local & job)
212   {
213     // L'id du thread a creer
214     pthread_t thread_id = 
215 #ifdef WIN32
216     {0,0};
217 #else
218       0;
219 #endif
220
221     // Les attributs du thread a sa creation
222     pthread_attr_t thread_attr;
223     pthread_attr_init(&thread_attr);
224     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
225
226     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
227
228     // Creation du thread qui va executer la commande systeme qu'on lui passe
229     int rc = pthread_create(&thread_id, 
230       &thread_attr, 
231       &ThreadAdapter::run, 
232       static_cast<void *>(p_ta));
233     if (rc) {
234     }
235
236     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
237     pthread_attr_destroy(&thread_attr);
238
239     return thread_id;
240   }
241
242
243   // Methode de destruction d'un job
244   void BatchManager_Local::cancel(pthread_t thread_id)
245   {
246     pthread_cancel(thread_id);
247   }
248
249
250   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique 
251   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
252   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
253   // Thread_id / Id (_thread_id_id_association_mutex)
254   BatchManager_Local::Id BatchManager_Local::nextId() 
255   {
256     static Id id = 0;
257     Id nextId = id++;
258     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
259     return nextId;
260   }
261
262
263   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
264   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
265   {
266     Id id = -1;
267
268     // @@@ --------> SECTION CRITIQUE <-------- @@@
269     pthread_mutex_lock(&_thread_id_id_association_mutex);
270 #ifndef WIN32 //TODO: porting of following functionality
271     while (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end()) 
272       pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
273
274     id = _thread_id_id_association[thread_id];
275     _thread_id_id_association.erase(thread_id);
276 #endif
277
278     pthread_mutex_unlock(&_thread_id_id_association_mutex);
279     // @@@ --------> SECTION CRITIQUE <-------- @@@
280
281     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
282     return id;
283   }
284
285
286   // Associe un Thread_id a un Id nouvellement cree
287   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) 
288   {
289     Id id = -1;
290
291     // @@@ --------> SECTION CRITIQUE <-------- @@@
292     pthread_mutex_lock(&_thread_id_id_association_mutex);
293 #ifndef WIN32 //TODO: porting of following functionality
294     if (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end()) {
295       id = _thread_id_id_association[thread_id] = nextId();
296       pthread_cond_signal(&_thread_id_id_association_cond);
297
298     } else {
299       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
300     }
301 #endif
302     pthread_mutex_unlock(&_thread_id_id_association_mutex);
303     // @@@ --------> SECTION CRITIQUE <-------- @@@
304
305     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
306     return id;
307   }
308
309
310   // Constructeur de la classe ThreadAdapter
311   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
312   _bm(bm), _job(job)
313   {
314     // Nothing to do
315   }
316
317
318
319   // Methode d'execution du thread
320   void * BatchManager_Local::ThreadAdapter::run(void * arg)
321   {
322 #ifndef WIN32 //TODO: porting of following functionality
323     // On bloque tous les signaux pour ce thread
324     sigset_t setmask;
325     sigfillset(&setmask);
326     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
327
328     // On autorise la terminaison differee du thread
329     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
330     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
331     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
332
333     // On enregistre la fonction de suppression du fils en cas d'arret du thread
334     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
335     // sera prise en compte par pthread_testcancel()
336     pid_t child;
337     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
338     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
339
340     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
341
342
343
344
345     // Le code retour cumule (ORed) de tous les appels
346     // Nul en cas de reussite de l'ensemble des operations
347     int rc = 0;
348
349     // Cette table contient la liste des fichiers a detruire a la fin du processus
350     std::vector<string> files_to_delete;
351
352
353
354     // On copie les fichiers d'entree pour le fils
355     const Parametre param   = p_ta->_job.getParametre();
356     Parametre::const_iterator it;
357
358     // On initialise la variable workdir a la valeur du Current Working Directory
359     char * cwd = 
360 #ifdef WIN32
361       _getcwd(NULL, 0);
362 #else
363       new char [PATH_MAX];
364     getcwd(cwd, PATH_MAX);
365 #endif
366     string workdir = cwd;
367     delete [] cwd;
368
369     if ( (it = param.find(WORKDIR)) != param.end() ) {
370       workdir = static_cast<string>( (*it).second );
371     }
372
373     string executionhost = string(param[EXECUTIONHOST]);
374
375     if ( (it = param.find(INFILE)) != param.end() ) {
376       Versatile V = (*it).second;
377       Versatile::iterator Vit;
378
379       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
380         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
381         Couple cp       = cpt;
382         string local    = cp.getLocal();
383         string remote   = cp.getRemote();
384
385         string copy_cmd = p_ta->getBatchManager().copy_command("", local, executionhost, workdir + "/" + remote);
386         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
387
388         if (system(copy_cmd.c_str()) ) {
389           // Echec de la copie
390           rc |= 1;
391         } else {
392           // On enregistre le fichier comme etant a detruire
393           files_to_delete.push_back(workdir + "/" + remote);
394         }
395
396       }
397     }
398
399
400
401
402 #ifdef WIN32
403     //TODO
404     //Using CreateThread instead fork() POSIX function
405 #else
406     // On forke/exec un nouveau process pour pouvoir controler le fils
407     // (plus finement qu'avec un appel system)
408     // int rc = system(commande.c_str());
409     child = fork();
410     if (child < 0) { // erreur
411       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
412
413     } else if (child > 0) { // pere
414       p_ta->pere(child);
415
416     } else { // fils
417       p_ta->fils();
418     }
419 #endif
420
421
422
423     // On copie les fichiers de sortie du fils
424     if ( (it = param.find(OUTFILE)) != param.end() ) {
425       Versatile V = (*it).second;
426       Versatile::iterator Vit;
427
428       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
429         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
430         Couple cp       = cpt;
431         string local    = cp.getLocal();
432         string remote   = cp.getRemote();
433
434         string copy_cmd = p_ta->getBatchManager().copy_command(executionhost, workdir + "/" + remote, "", local);
435         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
436
437         if (system(copy_cmd.c_str()) ) {
438           // Echec de la copie
439           rc |= 1;
440         } else {
441           // On enregistre le fichier comme etant a detruire
442           files_to_delete.push_back(workdir + "/" + remote);
443         }
444
445       }
446     }
447
448
449
450
451     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
452     // ou si la creation du fils n'a pu avoir lieu
453     if ( (rc == 0) || (child < 0) ) {
454       std::vector<string>::const_iterator it;
455       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
456         string remove_cmd = p_ta->getBatchManager().remove_command(executionhost, *it);
457         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
458         system(remove_cmd.c_str());
459       }
460     }
461
462
463
464     // On retire la fonction de nettoyage de la memoire
465     pthread_cleanup_pop(0);
466
467     // On retire la fonction de suppression du fils
468     pthread_cleanup_pop(0);
469
470
471
472     // On invoque la fonction de nettoyage de la memoire
473     delete_on_exit(arg);
474
475     UNDER_LOCK( cout << "Father is leaving" << endl );
476     pthread_exit(NULL);
477 #endif
478     return NULL;
479   }
480
481
482
483
484   void BatchManager_Local::ThreadAdapter::pere(pid_t child)
485   {
486 #ifndef WIN32 //TODO: porting of following functionality
487     time_t child_starttime = time(NULL);
488
489     // On enregistre le fils dans la table des threads
490     pthread_t thread_id = pthread_self();
491     Id id = _bm.registerThread_id(thread_id);
492
493     Parametre param   = _job.getParametre();
494     Environnement env = _job.getEnvironnement();
495
496     ostringstream thread_id_sst;
497     thread_id_sst << id;
498     param[ID]         = thread_id_sst.str();
499     param[STATE]      = "Running";
500     param[PID]        = child;
501
502     // @@@ --------> SECTION CRITIQUE <-------- @@@
503     pthread_mutex_lock(&_bm._threads_mutex);
504     _bm._threads[id].thread_id = thread_id;
505     _bm._threads[id].pid       = child;
506     _bm._threads[id].status    = RUNNING;
507     _bm._threads[id].param     = param;
508     _bm._threads[id].env       = env;
509     _bm._threads[id].command_queue.push(NOP);
510     pthread_mutex_unlock(&_bm._threads_mutex);
511     // @@@ --------> SECTION CRITIQUE <-------- @@@
512
513
514
515
516
517     // on boucle en attendant que le fils ait termine
518     while (1) {
519       int child_rc = 0;
520       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
521       if (child_wait_rc > 0) {
522         if (WIFSTOPPED(child_rc)) {
523           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED 
524           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment 
525           // desactive car s'il est possible de detecter l'arret d'un process, il est 
526           // plus difficile de detecter sa reprise.
527
528           // Le fils est simplement stoppe
529           // @@@ --------> SECTION CRITIQUE <-------- @@@
530           pthread_mutex_lock(&_bm._threads_mutex);
531           _bm._threads[id].status       = STOPPED;
532           _bm._threads[id].param[STATE] = "Stopped";
533           pthread_mutex_unlock(&_bm._threads_mutex);
534           // @@@ --------> SECTION CRITIQUE <-------- @@@
535           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
536
537         } 
538         else {
539           // Le fils est termine, on sort de la boucle et du if englobant
540           // @@@ --------> SECTION CRITIQUE <-------- @@@
541           pthread_mutex_lock(&_bm._threads_mutex);
542           _bm._threads[id].status       = DONE;
543           _bm._threads[id].param[STATE] = "Done";
544           pthread_mutex_unlock(&_bm._threads_mutex);
545           // @@@ --------> SECTION CRITIQUE <-------- @@@
546           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
547           break;
548         }
549       }
550       else if (child_wait_rc == -1) {
551         // Le fils a disparu ...
552         // @@@ --------> SECTION CRITIQUE <-------- @@@
553         pthread_mutex_lock(&_bm._threads_mutex);
554         _bm._threads[id].status       = DEAD;
555         _bm._threads[id].param[STATE] = "Dead";
556         pthread_mutex_unlock(&_bm._threads_mutex);
557         // @@@ --------> SECTION CRITIQUE <-------- @@@
558         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
559         break;
560       }
561
562
563
564       // On teste si le thread doit etre detruit
565       pthread_testcancel();
566
567
568
569       // On regarde si le fils n'a pas depasse son temps (wallclock time)
570       time_t child_currenttime = time(NULL);
571       time_t child_elapsedtime = child_currenttime - child_starttime;
572       if (param.find(MAXWALLTIME) != param.end()) {
573         int maxwalltime = param[MAXWALLTIME];
574         //        cout << "child_starttime          = " << child_starttime        << endl
575         //             << "child_currenttime        = " << child_currenttime      << endl
576         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
577         //             << "maxwalltime              = " << maxwalltime            << endl
578         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
579         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
580           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
581           // On introduit une commande dans la queue du thread
582           // @@@ --------> SECTION CRITIQUE <-------- @@@
583           pthread_mutex_lock(&_bm._threads_mutex);
584           if (_bm._threads.find(id) != _bm._threads.end()) 
585             _bm._threads[id].command_queue.push(KILL);
586           pthread_mutex_unlock(&_bm._threads_mutex);
587           // @@@ --------> SECTION CRITIQUE <-------- @@@
588
589
590         } else if (child_elapsedtime > maxwalltime ) {
591           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
592           // On introduit une commande dans la queue du thread
593           // @@@ --------> SECTION CRITIQUE <-------- @@@
594           pthread_mutex_lock(&_bm._threads_mutex);
595           if (_bm._threads.find(id) != _bm._threads.end()) 
596             _bm._threads[id].command_queue.push(TERM);
597           pthread_mutex_unlock(&_bm._threads_mutex);
598           // @@@ --------> SECTION CRITIQUE <-------- @@@
599         }
600       }
601
602
603
604       // On regarde s'il y a quelque chose a faire dans la queue de commande
605       // @@@ --------> SECTION CRITIQUE <-------- @@@
606       pthread_mutex_lock(&_bm._threads_mutex);
607       if (_bm._threads.find(id) != _bm._threads.end()) {
608         while (_bm._threads[id].command_queue.size() > 0) {
609           Commande cmd = _bm._threads[id].command_queue.front();
610           _bm._threads[id].command_queue.pop();
611
612           switch (cmd) {
613     case NOP:
614       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
615       break;
616
617     case HOLD:
618       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
619       kill(child, SIGSTOP);
620       break;
621
622     case RELEASE:
623       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
624       kill(child, SIGCONT);
625       break;
626
627     case TERM:
628       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
629       kill(child, SIGTERM);
630       break;
631
632     case KILL:
633       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
634       kill(child, SIGKILL);
635       break;
636
637     case ALTER:
638       break;
639
640     default:
641       break;
642           }
643         }
644
645       }
646       pthread_mutex_unlock(&_bm._threads_mutex);
647       // @@@ --------> SECTION CRITIQUE <-------- @@@
648
649       // On fait une petite pause pour ne pas surcharger inutilement le processeur
650       sleep(1);
651
652     }
653 #endif
654
655
656   }
657
658
659
660
661   void BatchManager_Local::ThreadAdapter::fils()
662   {
663 #ifndef WIN32 //TODO: porting of following functionality
664     Parametre param = _job.getParametre();
665     Parametre::iterator it;
666
667     try {
668
669       // On se place dans le repertoire de travail
670       if ( (it = param.find(WORKDIR)) != param.end() ) {
671         string workdir = static_cast<string>( (*it).second );
672         chdir(workdir.c_str());
673       }
674
675
676
677
678       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
679       char * execpath = NULL;
680       if (param.find(EXECUTABLE) != param.end()) {
681         string executable = _bm.exec_command(param);
682         execpath          = new char [executable.size() + 1];
683         strncpy(execpath, executable.c_str(), executable.size() + 1);
684       } else exit(1); 
685
686       string debug_command = execpath;
687
688       string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
689
690       char **  argv = NULL;
691       if (param.find(ARGUMENTS) != param.end()) {
692         Versatile V = param[ARGUMENTS];
693
694         argv = new char * [V.size() + 2]; // 1 pour name et 1 pour le NULL terminal
695
696         argv[0] = new char [name.size() + 1];
697         strncpy(argv[0], name.c_str(), name.size() + 1);
698
699         debug_command  += string(" # ") + argv[0];
700
701         int i = 1;
702         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++, i++) {
703           StringType argt = * static_cast<StringType *>(*it);
704           string     arg  = argt;
705           argv[i]         = new char [arg.size() + 1];
706           strncpy(argv[i], arg.c_str(), arg.size() + 1);
707           debug_command  += string(" # ") + argv[i];
708         }
709
710         // assert (i == V.size() + 1)
711         argv[i] = NULL;
712       }
713
714
715       UNDER_LOCK( cout << "*** debug_command = " << debug_command << endl );
716
717
718
719       Environnement env = _job.getEnvironnement();
720
721
722       char ** envp = NULL;
723       if(env.size() > 0) {
724         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
725         int i = 0;
726         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
727           const string  & key   = (*it).first;
728           const string  & value = (*it).second;
729           ostringstream oss;
730           oss << key << "=" << value;
731           envp[i]         = new char [oss.str().size() + 1];
732           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
733         }
734
735         // assert (i == env.size())
736         envp[i] = NULL;
737       }
738
739
740
741
742       // On positionne les limites systeme imposees au fils
743       if (param.find(MAXCPUTIME) != param.end()) {
744         int maxcputime = param[MAXCPUTIME];
745         struct rlimit limit;
746         limit.rlim_cur = maxcputime;
747         limit.rlim_max = int(maxcputime * 1.1);
748         setrlimit(RLIMIT_CPU, &limit);
749       }
750
751       if (param.find(MAXDISKSIZE) != param.end()) {
752         int maxdisksize = param[MAXDISKSIZE];
753         struct rlimit limit;
754         limit.rlim_cur = maxdisksize * 1024;
755         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
756         setrlimit(RLIMIT_FSIZE, &limit);
757       }
758
759       if (param.find(MAXRAMSIZE) != param.end()) {
760         int maxramsize = param[MAXRAMSIZE];
761         struct rlimit limit;
762         limit.rlim_cur = maxramsize * 1024;
763         limit.rlim_max = int(maxramsize * 1.1) * 1024;
764         setrlimit(RLIMIT_AS, &limit);
765       }
766
767
768
769       // On cree une session pour le fils de facon a ce qu'il ne soit pas
770       // detruit lorsque le shell se termine (le shell ouvre une session et
771       // tue tous les process appartenant a la session en quittant)
772       setsid();
773
774
775       // On ferme les descripteurs de fichiers standards
776       //close(STDIN_FILENO);
777       //close(STDOUT_FILENO);
778       //close(STDERR_FILENO);
779
780
781       // On execute la commande du fils
782       execve(execpath, argv, envp);
783
784       // No need to deallocate since nothing happens after a successful exec
785
786       // Normalement on ne devrait jamais arriver ici    
787       ofstream file_err("error.log");
788       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
789
790     } catch (GenericException & e) {
791
792       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
793     }
794
795     exit(99);
796 #endif
797   }
798
799
800
801
802   void BatchManager_Local::kill_child_on_exit(void * p_pid)
803   {
804 #ifndef WIN32 
805     //TODO: porting of following functionality
806     pid_t child = * static_cast<pid_t *>(p_pid);
807
808     // On tue le fils
809     kill(child, SIGTERM);
810
811     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
812     // mais cette option n'est pas implementee pour le moment, car il est 
813     // preferable de laisser le process fils se terminer normalement et seul.
814 #endif
815   }
816
817   void BatchManager_Local::delete_on_exit(void * arg)
818   {
819     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
820     delete p_ta;
821   }
822
823 }