]> SALOME platform Git repositories - modules/kernel.git/blob - src/Batch/Batch_BatchManager_Local.cxx
Salome HOME
Porting functionality on Win32 Platform
[modules/kernel.git] / src / Batch / Batch_BatchManager_Local.cxx
1 // Copyright (C) 2005  OPEN CASCADE, CEA, EDF R&D, LEG
2 //           PRINCIPIA R&D, EADS CCR, Lip6, BV, CEDRAT
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either 
6 // version 2.1 of the License.
7 // 
8 // This library is distributed in the hope that it will be useful 
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of 
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
11 // Lesser General Public License for more details.
12 // 
13 // You should have received a copy of the GNU Lesser General Public  
14 // License along with this library; if not, write to the Free Software 
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 // 
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 // 
19 /*
20 * BatchManager_Local.cxx : 
21 *
22 * Auteur : Ivan DUTKA-MALEN - EDF R&D
23 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
24 * Date   : Thu Nov  6 10:17:22 2003
25 * Projet : Salome 2
26 *
27 */
28
29 #include <iostream>
30 #include <fstream>
31 #include <sstream>
32 #include <cstdlib>
33 #include <sys/types.h>
34 #ifdef WIN32
35 # include <direct.h>
36 #else
37 # include <sys/wait.h>
38 # include <unistd.h>
39 #endif
40 #include <ctime>
41 #include <pthread.h>
42 #include <signal.h>
43 #include <errno.h>
44 #include <string.h>
45 #include "Batch_IOMutex.hxx"
46 #include "Batch_BatchManager_Local.hxx"
47
48 namespace Batch {
49
50
51   // Constructeur
52   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host) throw(InvalidArgumentException,ConnexionFailureException) : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), _thread_id_id_association_mutex(), _thread_id_id_association_cond()
53 #ifndef WIN32 //TODO: porting of following functionality
54     ,_thread_id_id_association()
55 #endif
56   {
57     pthread_mutex_init(&_threads_mutex, NULL);
58     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
59     pthread_cond_init(&_thread_id_id_association_cond, NULL);
60   }
61
62   // Destructeur
63   BatchManager_Local::~BatchManager_Local()
64   {
65     pthread_mutex_destroy(&_threads_mutex);
66     pthread_mutex_destroy(&_thread_id_id_association_mutex);
67     pthread_cond_destroy(&_thread_id_id_association_cond);
68   }
69
70   // Methode pour le controle des jobs : soumet un job au gestionnaire
71   const JobId BatchManager_Local::submitJob(const Job & job)
72   {
73     Job_Local jobLocal = job;
74
75     pthread_t thread_id = submit(jobLocal);
76
77     ostringstream oss;
78     oss << getIdByThread_id(thread_id);
79
80     JobId id(this, oss.str());
81
82     return id;
83   }
84
85   // Methode pour le controle des jobs : retire un job du gestionnaire
86   void BatchManager_Local::deleteJob(const JobId & jobid)
87   {
88     Id id;
89
90     istringstream iss(jobid.getReference());
91     iss >> id;
92
93     // On retrouve le thread_id du thread
94     pthread_t thread_id;
95
96     // @@@ --------> SECTION CRITIQUE <-------- @@@
97     pthread_mutex_lock(&_threads_mutex);
98     if (_threads.find(id) != _threads.end()) 
99       thread_id = _threads[id].thread_id;
100     pthread_mutex_unlock(&_threads_mutex);
101     // @@@ --------> SECTION CRITIQUE <-------- @@@
102
103     cancel(thread_id);
104   }
105
106   // Methode pour le controle des jobs : suspend un job en file d'attente
107   void BatchManager_Local::holdJob(const JobId & jobid)
108   {
109     Id id;
110     istringstream iss(jobid.getReference());
111     iss >> id;
112
113     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
114
115     // On introduit une commande dans la queue du thread
116     // @@@ --------> SECTION CRITIQUE <-------- @@@
117     pthread_mutex_lock(&_threads_mutex);
118     if (_threads.find(id) != _threads.end()) 
119       _threads[id].command_queue.push(HOLD);
120     pthread_mutex_unlock(&_threads_mutex);
121     // @@@ --------> SECTION CRITIQUE <-------- @@@
122   }
123
124   // Methode pour le controle des jobs : relache un job suspendu
125   void BatchManager_Local::releaseJob(const JobId & jobid)
126   {
127     Id id;
128     istringstream iss(jobid.getReference());
129     iss >> id;
130
131     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
132
133     // On introduit une commande dans la queue du thread
134     // @@@ --------> SECTION CRITIQUE <-------- @@@
135     pthread_mutex_lock(&_threads_mutex);
136     if (_threads.find(id) != _threads.end()) 
137       _threads[id].command_queue.push(RELEASE);
138     pthread_mutex_unlock(&_threads_mutex);
139     // @@@ --------> SECTION CRITIQUE <-------- @@@
140   }
141
142
143   // Methode pour le controle des jobs : modifie un job en file d'attente
144   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
145   {
146   }
147
148   // Methode pour le controle des jobs : modifie un job en file d'attente
149   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
150   {
151     alterJob(jobid, param, Environnement());
152   }
153
154   // Methode pour le controle des jobs : modifie un job en file d'attente
155   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
156   {
157     alterJob(jobid, Parametre(), env);
158   }
159
160
161
162   // Methode pour le controle des jobs : renvoie l'etat du job
163   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
164   {
165     Id id;
166     istringstream iss(jobid.getReference());
167     iss >> id;
168
169     Parametre param;
170     Environnement env;
171
172     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
173     // @@@ --------> SECTION CRITIQUE <-------- @@@
174     pthread_mutex_lock(&_threads_mutex);
175     param = _threads[id].param;
176     env   = _threads[id].env;
177     pthread_mutex_unlock(&_threads_mutex);
178     // @@@ --------> SECTION CRITIQUE <-------- @@@
179     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
180
181     JobInfo_Local ji(param, env);
182     return ji;
183   }
184
185
186
187   // Methode pour le controle des jobs : teste si un job est present en machine
188   bool BatchManager_Local::isRunning(const JobId & jobid)
189   {
190     Id id;
191     istringstream iss(jobid.getReference());
192     iss >> id;
193
194     Status status;
195
196     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
197     // @@@ --------> SECTION CRITIQUE <-------- @@@
198     pthread_mutex_lock(&_threads_mutex);
199     status = _threads[id].status;
200     pthread_mutex_unlock(&_threads_mutex);
201     // @@@ --------> SECTION CRITIQUE <-------- @@@
202     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
203
204     return (status == RUNNING);
205   }
206
207
208   // Methode d'execution d'un job
209   pthread_t BatchManager_Local::submit(const Job_Local & job)
210   {
211     // L'id du thread a creer
212     pthread_t thread_id = 
213 #ifdef WIN32
214     {0,0};
215 #else
216       0;
217 #endif
218
219     // Les attributs du thread a sa creation
220     pthread_attr_t thread_attr;
221     pthread_attr_init(&thread_attr);
222     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
223
224     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
225
226     // Creation du thread qui va executer la commande systeme qu'on lui passe
227     int rc = pthread_create(&thread_id, 
228       &thread_attr, 
229       &ThreadAdapter::run, 
230       static_cast<void *>(p_ta));
231     if (rc) {
232     }
233
234     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
235     pthread_attr_destroy(&thread_attr);
236
237     return thread_id;
238   }
239
240
241   // Methode de destruction d'un job
242   void BatchManager_Local::cancel(pthread_t thread_id)
243   {
244     pthread_cancel(thread_id);
245   }
246
247
248   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique 
249   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
250   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
251   // Thread_id / Id (_thread_id_id_association_mutex)
252   BatchManager_Local::Id BatchManager_Local::nextId() 
253   {
254     static Id id = 0;
255     Id nextId = id++;
256     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
257     return nextId;
258   }
259
260
261   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
262   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
263   {
264     Id id = -1;
265
266     // @@@ --------> SECTION CRITIQUE <-------- @@@
267     pthread_mutex_lock(&_thread_id_id_association_mutex);
268 #ifndef WIN32 //TODO: porting of following functionality
269     while (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end()) 
270       pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
271
272     id = _thread_id_id_association[thread_id];
273     _thread_id_id_association.erase(thread_id);
274 #endif
275
276     pthread_mutex_unlock(&_thread_id_id_association_mutex);
277     // @@@ --------> SECTION CRITIQUE <-------- @@@
278
279     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
280     return id;
281   }
282
283
284   // Associe un Thread_id a un Id nouvellement cree
285   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) 
286   {
287     Id id = -1;
288
289     // @@@ --------> SECTION CRITIQUE <-------- @@@
290     pthread_mutex_lock(&_thread_id_id_association_mutex);
291 #ifndef WIN32 //TODO: porting of following functionality
292     if (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end()) {
293       id = _thread_id_id_association[thread_id] = nextId();
294       pthread_cond_signal(&_thread_id_id_association_cond);
295
296     } else {
297       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
298     }
299 #endif
300     pthread_mutex_unlock(&_thread_id_id_association_mutex);
301     // @@@ --------> SECTION CRITIQUE <-------- @@@
302
303     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
304     return id;
305   }
306
307
308   // Constructeur de la classe ThreadAdapter
309   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
310   _bm(bm), _job(job)
311   {
312     // Nothing to do
313   }
314
315
316
317   // Methode d'execution du thread
318   void * BatchManager_Local::ThreadAdapter::run(void * arg)
319   {
320 #ifndef WIN32 //TODO: porting of following functionality
321     // On bloque tous les signaux pour ce thread
322     sigset_t setmask;
323     sigfillset(&setmask);
324     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
325
326     // On autorise la terminaison differee du thread
327     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
328     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
329     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
330
331     // On enregistre la fonction de suppression du fils en cas d'arret du thread
332     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
333     // sera prise en compte par pthread_testcancel()
334     pid_t child;
335     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
336     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
337
338     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
339
340
341
342
343     // Le code retour cumule (ORed) de tous les appels
344     // Nul en cas de reussite de l'ensemble des operations
345     int rc = 0;
346
347     // Cette table contient la liste des fichiers a detruire a la fin du processus
348     std::vector<string> files_to_delete;
349
350
351
352     // On copie les fichiers d'entree pour le fils
353     const Parametre param   = p_ta->_job.getParametre();
354     Parametre::const_iterator it;
355
356     // On initialise la variable workdir a la valeur du Current Working Directory
357     char * cwd = 
358 #ifdef WIN32
359       _getcwd(NULL, 0);
360 #else
361       new char [PATH_MAX];
362     getcwd(cwd, PATH_MAX);
363 #endif
364     string workdir = cwd;
365     delete [] cwd;
366
367     if ( (it = param.find(WORKDIR)) != param.end() ) {
368       workdir = static_cast<string>( (*it).second );
369     }
370
371     string executionhost = string(param[EXECUTIONHOST]);
372
373     if ( (it = param.find(INFILE)) != param.end() ) {
374       Versatile V = (*it).second;
375       Versatile::iterator Vit;
376
377       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
378         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
379         Couple cp       = cpt;
380         string local    = cp.getLocal();
381         string remote   = cp.getRemote();
382
383         string copy_cmd = p_ta->getBatchManager().copy_command("", local, executionhost, workdir + "/" + remote);
384         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
385
386         if (system(copy_cmd.c_str()) ) {
387           // Echec de la copie
388           rc |= 1;
389         } else {
390           // On enregistre le fichier comme etant a detruire
391           files_to_delete.push_back(workdir + "/" + remote);
392         }
393
394       }
395     }
396
397
398
399
400 #ifdef WIN32
401     //TODO
402     //Using CreateThread instead fork() POSIX function
403 #else
404     // On forke/exec un nouveau process pour pouvoir controler le fils
405     // (plus finement qu'avec un appel system)
406     // int rc = system(commande.c_str());
407     child = fork();
408     if (child < 0) { // erreur
409       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
410
411     } else if (child > 0) { // pere
412       p_ta->pere(child);
413
414     } else { // fils
415       p_ta->fils();
416     }
417 #endif
418
419
420
421     // On copie les fichiers de sortie du fils
422     if ( (it = param.find(OUTFILE)) != param.end() ) {
423       Versatile V = (*it).second;
424       Versatile::iterator Vit;
425
426       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
427         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
428         Couple cp       = cpt;
429         string local    = cp.getLocal();
430         string remote   = cp.getRemote();
431
432         string copy_cmd = p_ta->getBatchManager().copy_command(executionhost, workdir + "/" + remote, "", local);
433         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
434
435         if (system(copy_cmd.c_str()) ) {
436           // Echec de la copie
437           rc |= 1;
438         } else {
439           // On enregistre le fichier comme etant a detruire
440           files_to_delete.push_back(workdir + "/" + remote);
441         }
442
443       }
444     }
445
446
447
448
449     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
450     // ou si la creation du fils n'a pu avoir lieu
451     if ( (rc == 0) || (child < 0) ) {
452       std::vector<string>::const_iterator it;
453       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
454         string remove_cmd = p_ta->getBatchManager().remove_command(executionhost, *it);
455         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
456         system(remove_cmd.c_str());
457       }
458     }
459
460
461
462     // On retire la fonction de nettoyage de la memoire
463     pthread_cleanup_pop(0);
464
465     // On retire la fonction de suppression du fils
466     pthread_cleanup_pop(0);
467
468
469
470     // On invoque la fonction de nettoyage de la memoire
471     delete_on_exit(arg);
472
473     UNDER_LOCK( cout << "Father is leaving" << endl );
474     pthread_exit(NULL);
475 #endif
476     return NULL;
477   }
478
479
480
481
482   void BatchManager_Local::ThreadAdapter::pere(pid_t child)
483   {
484 #ifndef WIN32 //TODO: porting of following functionality
485     time_t child_starttime = time(NULL);
486
487     // On enregistre le fils dans la table des threads
488     pthread_t thread_id = pthread_self();
489     Id id = _bm.registerThread_id(thread_id);
490
491     Parametre param   = _job.getParametre();
492     Environnement env = _job.getEnvironnement();
493
494     ostringstream thread_id_sst;
495     thread_id_sst << id;
496     param[ID]         = thread_id_sst.str();
497     param[STATE]      = "Running";
498     param[PID]        = child;
499
500     // @@@ --------> SECTION CRITIQUE <-------- @@@
501     pthread_mutex_lock(&_bm._threads_mutex);
502     _bm._threads[id].thread_id = thread_id;
503     _bm._threads[id].pid       = child;
504     _bm._threads[id].status    = RUNNING;
505     _bm._threads[id].param     = param;
506     _bm._threads[id].env       = env;
507     _bm._threads[id].command_queue.push(NOP);
508     pthread_mutex_unlock(&_bm._threads_mutex);
509     // @@@ --------> SECTION CRITIQUE <-------- @@@
510
511
512
513
514
515     // on boucle en attendant que le fils ait termine
516     while (1) {
517       int child_rc = 0;
518       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
519       if (child_wait_rc > 0) {
520         if (WIFSTOPPED(child_rc)) {
521           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED 
522           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment 
523           // desactive car s'il est possible de detecter l'arret d'un process, il est 
524           // plus difficile de detecter sa reprise.
525
526           // Le fils est simplement stoppe
527           // @@@ --------> SECTION CRITIQUE <-------- @@@
528           pthread_mutex_lock(&_bm._threads_mutex);
529           _bm._threads[id].status       = STOPPED;
530           _bm._threads[id].param[STATE] = "Stopped";
531           pthread_mutex_unlock(&_bm._threads_mutex);
532           // @@@ --------> SECTION CRITIQUE <-------- @@@
533           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
534
535         } 
536         else {
537           // Le fils est termine, on sort de la boucle et du if englobant
538           // @@@ --------> SECTION CRITIQUE <-------- @@@
539           pthread_mutex_lock(&_bm._threads_mutex);
540           _bm._threads[id].status       = DONE;
541           _bm._threads[id].param[STATE] = "Done";
542           pthread_mutex_unlock(&_bm._threads_mutex);
543           // @@@ --------> SECTION CRITIQUE <-------- @@@
544           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
545           break;
546         }
547       }
548       else if (child_wait_rc == -1) {
549         // Le fils a disparu ...
550         // @@@ --------> SECTION CRITIQUE <-------- @@@
551         pthread_mutex_lock(&_bm._threads_mutex);
552         _bm._threads[id].status       = DEAD;
553         _bm._threads[id].param[STATE] = "Dead";
554         pthread_mutex_unlock(&_bm._threads_mutex);
555         // @@@ --------> SECTION CRITIQUE <-------- @@@
556         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
557         break;
558       }
559
560
561
562       // On teste si le thread doit etre detruit
563       pthread_testcancel();
564
565
566
567       // On regarde si le fils n'a pas depasse son temps (wallclock time)
568       time_t child_currenttime = time(NULL);
569       time_t child_elapsedtime = child_currenttime - child_starttime;
570       if (param.find(MAXWALLTIME) != param.end()) {
571         int maxwalltime = param[MAXWALLTIME];
572         //        cout << "child_starttime          = " << child_starttime        << endl
573         //             << "child_currenttime        = " << child_currenttime      << endl
574         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
575         //             << "maxwalltime              = " << maxwalltime            << endl
576         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
577         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
578           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
579           // On introduit une commande dans la queue du thread
580           // @@@ --------> SECTION CRITIQUE <-------- @@@
581           pthread_mutex_lock(&_bm._threads_mutex);
582           if (_bm._threads.find(id) != _bm._threads.end()) 
583             _bm._threads[id].command_queue.push(KILL);
584           pthread_mutex_unlock(&_bm._threads_mutex);
585           // @@@ --------> SECTION CRITIQUE <-------- @@@
586
587
588         } else if (child_elapsedtime > maxwalltime ) {
589           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
590           // On introduit une commande dans la queue du thread
591           // @@@ --------> SECTION CRITIQUE <-------- @@@
592           pthread_mutex_lock(&_bm._threads_mutex);
593           if (_bm._threads.find(id) != _bm._threads.end()) 
594             _bm._threads[id].command_queue.push(TERM);
595           pthread_mutex_unlock(&_bm._threads_mutex);
596           // @@@ --------> SECTION CRITIQUE <-------- @@@
597         }
598       }
599
600
601
602       // On regarde s'il y a quelque chose a faire dans la queue de commande
603       // @@@ --------> SECTION CRITIQUE <-------- @@@
604       pthread_mutex_lock(&_bm._threads_mutex);
605       if (_bm._threads.find(id) != _bm._threads.end()) {
606         while (_bm._threads[id].command_queue.size() > 0) {
607           Commande cmd = _bm._threads[id].command_queue.front();
608           _bm._threads[id].command_queue.pop();
609
610           switch (cmd) {
611     case NOP:
612       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
613       break;
614
615     case HOLD:
616       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
617       kill(child, SIGSTOP);
618       break;
619
620     case RELEASE:
621       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
622       kill(child, SIGCONT);
623       break;
624
625     case TERM:
626       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
627       kill(child, SIGTERM);
628       break;
629
630     case KILL:
631       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
632       kill(child, SIGKILL);
633       break;
634
635     case ALTER:
636       break;
637
638     default:
639       break;
640           }
641         }
642
643       }
644       pthread_mutex_unlock(&_bm._threads_mutex);
645       // @@@ --------> SECTION CRITIQUE <-------- @@@
646
647       // On fait une petite pause pour ne pas surcharger inutilement le processeur
648       sleep(1);
649
650     }
651 #endif
652
653
654   }
655
656
657
658
659   void BatchManager_Local::ThreadAdapter::fils()
660   {
661 #ifndef WIN32 //TODO: porting of following functionality
662     Parametre param = _job.getParametre();
663     Parametre::iterator it;
664
665     try {
666
667       // On se place dans le repertoire de travail
668       if ( (it = param.find(WORKDIR)) != param.end() ) {
669         string workdir = static_cast<string>( (*it).second );
670         chdir(workdir.c_str());
671       }
672
673
674
675
676       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
677       char * execpath = NULL;
678       if (param.find(EXECUTABLE) != param.end()) {
679         string executable = _bm.exec_command(param);
680         execpath          = new char [executable.size() + 1];
681         strncpy(execpath, executable.c_str(), executable.size() + 1);
682       } else exit(1); 
683
684       string debug_command = execpath;
685
686       string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
687
688       char **  argv = NULL;
689       if (param.find(ARGUMENTS) != param.end()) {
690         Versatile V = param[ARGUMENTS];
691
692         argv = new char * [V.size() + 2]; // 1 pour name et 1 pour le NULL terminal
693
694         argv[0] = new char [name.size() + 1];
695         strncpy(argv[0], name.c_str(), name.size() + 1);
696
697         debug_command  += string(" # ") + argv[0];
698
699         int i = 1;
700         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++, i++) {
701           StringType argt = * static_cast<StringType *>(*it);
702           string     arg  = argt;
703           argv[i]         = new char [arg.size() + 1];
704           strncpy(argv[i], arg.c_str(), arg.size() + 1);
705           debug_command  += string(" # ") + argv[i];
706         }
707
708         // assert (i == V.size() + 1)
709         argv[i] = NULL;
710       }
711
712
713       UNDER_LOCK( cout << "*** debug_command = " << debug_command << endl );
714
715
716
717       Environnement env = _job.getEnvironnement();
718
719
720       char ** envp = NULL;
721       if(env.size() > 0) {
722         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
723         int i = 0;
724         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
725           const string  & key   = (*it).first;
726           const string  & value = (*it).second;
727           ostringstream oss;
728           oss << key << "=" << value;
729           envp[i]         = new char [oss.str().size() + 1];
730           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
731         }
732
733         // assert (i == env.size())
734         envp[i] = NULL;
735       }
736
737
738
739
740       // On positionne les limites systeme imposees au fils
741       if (param.find(MAXCPUTIME) != param.end()) {
742         int maxcputime = param[MAXCPUTIME];
743         struct rlimit limit;
744         limit.rlim_cur = maxcputime;
745         limit.rlim_max = int(maxcputime * 1.1);
746         setrlimit(RLIMIT_CPU, &limit);
747       }
748
749       if (param.find(MAXDISKSIZE) != param.end()) {
750         int maxdisksize = param[MAXDISKSIZE];
751         struct rlimit limit;
752         limit.rlim_cur = maxdisksize * 1024;
753         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
754         setrlimit(RLIMIT_FSIZE, &limit);
755       }
756
757       if (param.find(MAXRAMSIZE) != param.end()) {
758         int maxramsize = param[MAXRAMSIZE];
759         struct rlimit limit;
760         limit.rlim_cur = maxramsize * 1024;
761         limit.rlim_max = int(maxramsize * 1.1) * 1024;
762         setrlimit(RLIMIT_AS, &limit);
763       }
764
765
766
767       // On cree une session pour le fils de facon a ce qu'il ne soit pas
768       // detruit lorsque le shell se termine (le shell ouvre une session et
769       // tue tous les process appartenant a la session en quittant)
770       setsid();
771
772
773       // On ferme les descripteurs de fichiers standards
774       //close(STDIN_FILENO);
775       //close(STDOUT_FILENO);
776       //close(STDERR_FILENO);
777
778
779       // On execute la commande du fils
780       execve(execpath, argv, envp);
781
782       // No need to deallocate since nothing happens after a successful exec
783
784       // Normalement on ne devrait jamais arriver ici    
785       ofstream file_err("error.log");
786       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
787
788     } catch (GenericException & e) {
789
790       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
791     }
792
793     exit(99);
794 #endif
795   }
796
797
798
799
800   void BatchManager_Local::kill_child_on_exit(void * p_pid)
801   {
802 #ifndef WIN32 
803     //TODO: porting of following functionality
804     pid_t child = * static_cast<pid_t *>(p_pid);
805
806     // On tue le fils
807     kill(child, SIGTERM);
808
809     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
810     // mais cette option n'est pas implementee pour le moment, car il est 
811     // preferable de laisser le process fils se terminer normalement et seul.
812 #endif
813   }
814
815   void BatchManager_Local::delete_on_exit(void * arg)
816   {
817     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
818     delete p_ta;
819   }
820
821 }