Salome HOME
Fixed bug with threads in BatchManager_Local.
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53 #include "Batch_IOMutex.hxx"
54 #include "Batch_BatchManager_Local.hxx"
55 #include "Batch_RunTimeException.hxx"
56
57 using namespace std;
58
59 namespace Batch {
60
61
62   // Constructeur
63   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64                                          CommunicationProtocolType protocolType)
65     : BatchManager(parent, host), _connect(0),
66       _protocol(CommunicationProtocol::getInstance(protocolType)),
67       _idCounter(0)
68   {
69     pthread_mutex_init(&_threads_mutex, NULL);
70     pthread_cond_init(&_threadLaunchCondition, NULL);
71   }
72
73   // Destructeur
74   BatchManager_Local::~BatchManager_Local()
75   {
76     pthread_mutex_destroy(&_threads_mutex);
77     pthread_cond_destroy(&_threadLaunchCondition);
78   }
79
80   const CommunicationProtocol & BatchManager_Local::getProtocol() const
81   {
82     return _protocol;
83   }
84
85   // Methode pour le controle des jobs : soumet un job au gestionnaire
86   const JobId BatchManager_Local::submitJob(const Job & job)
87   {
88     Job_Local jobLocal = job;
89     Id id = _idCounter++;
90     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
91
92     // Les attributs du thread a sa creation
93     pthread_attr_t thread_attr;
94     pthread_attr_init(&thread_attr);
95     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
96
97     // Creation du thread qui va executer la commande systeme qu'on lui passe
98     pthread_t thread_id;
99     pthread_mutex_lock(&_threads_mutex);
100     int rc = pthread_create(&thread_id,
101       &thread_attr,
102       &ThreadAdapter::run,
103       static_cast<void *>(p_ta));
104
105     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
106     pthread_attr_destroy(&thread_attr);
107
108     if (rc != 0) {
109       pthread_mutex_unlock(&_threads_mutex);
110       throw RunTimeException("Can't create new thread in BatchManager_Local");
111     }
112
113     pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex);
114     pthread_mutex_unlock(&_threads_mutex);
115
116     ostringstream id_sst;
117     id_sst << id;
118     return JobId(this, id_sst.str());
119   }
120
121   // Methode pour le controle des jobs : retire un job du gestionnaire
122   void BatchManager_Local::deleteJob(const JobId & jobid)
123   {
124     Id id;
125
126     istringstream iss(jobid.getReference());
127     iss >> id;
128
129     // On retrouve le thread_id du thread
130     pthread_t thread_id;
131
132     // @@@ --------> SECTION CRITIQUE <-------- @@@
133     pthread_mutex_lock(&_threads_mutex);
134     if (_threads.find(id) != _threads.end())
135       thread_id = _threads[id].thread_id;
136     pthread_mutex_unlock(&_threads_mutex);
137     // @@@ --------> SECTION CRITIQUE <-------- @@@
138
139     cancel(thread_id);
140   }
141
142   // Methode pour le controle des jobs : suspend un job en file d'attente
143   void BatchManager_Local::holdJob(const JobId & jobid)
144   {
145     Id id;
146     istringstream iss(jobid.getReference());
147     iss >> id;
148
149     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
150
151     // On introduit une commande dans la queue du thread
152     // @@@ --------> SECTION CRITIQUE <-------- @@@
153     pthread_mutex_lock(&_threads_mutex);
154     if (_threads.find(id) != _threads.end())
155       _threads[id].command_queue.push(HOLD);
156     pthread_mutex_unlock(&_threads_mutex);
157     // @@@ --------> SECTION CRITIQUE <-------- @@@
158   }
159
160   // Methode pour le controle des jobs : relache un job suspendu
161   void BatchManager_Local::releaseJob(const JobId & jobid)
162   {
163     Id id;
164     istringstream iss(jobid.getReference());
165     iss >> id;
166
167     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
168
169     // On introduit une commande dans la queue du thread
170     // @@@ --------> SECTION CRITIQUE <-------- @@@
171     pthread_mutex_lock(&_threads_mutex);
172     if (_threads.find(id) != _threads.end())
173       _threads[id].command_queue.push(RELEASE);
174     pthread_mutex_unlock(&_threads_mutex);
175     // @@@ --------> SECTION CRITIQUE <-------- @@@
176   }
177
178
179   // Methode pour le controle des jobs : modifie un job en file d'attente
180   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
181   {
182   }
183
184   // Methode pour le controle des jobs : modifie un job en file d'attente
185   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
186   {
187     alterJob(jobid, param, Environnement());
188   }
189
190   // Methode pour le controle des jobs : modifie un job en file d'attente
191   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
192   {
193     alterJob(jobid, Parametre(), env);
194   }
195
196
197
198   // Methode pour le controle des jobs : renvoie l'etat du job
199   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
200   {
201     Id id;
202     istringstream iss(jobid.getReference());
203     iss >> id;
204
205     Parametre param;
206     Environnement env;
207
208     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
209     // @@@ --------> SECTION CRITIQUE <-------- @@@
210     pthread_mutex_lock(&_threads_mutex);
211     std::map<Id, Child >::iterator pos = _threads.find(id);
212     bool found = (pos != _threads.end());
213     if (found) {
214       param = pos->second.param;
215       env   = pos->second.env;
216     }
217     pthread_mutex_unlock(&_threads_mutex);
218     // @@@ --------> SECTION CRITIQUE <-------- @@@
219     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
220
221     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
222
223     JobInfo_Local ji(param, env);
224     return ji;
225   }
226
227
228
229   // Methode pour le controle des jobs : teste si un job est present en machine
230   bool BatchManager_Local::isRunning(const JobId & jobid)
231   {
232     Id id;
233     istringstream iss(jobid.getReference());
234     iss >> id;
235
236     Status status;
237
238     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
239     // @@@ --------> SECTION CRITIQUE <-------- @@@
240     pthread_mutex_lock(&_threads_mutex);
241     status = _threads[id].status;
242     pthread_mutex_unlock(&_threads_mutex);
243     // @@@ --------> SECTION CRITIQUE <-------- @@@
244     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
245
246     return (status == RUNNING);
247   }
248
249   // Methode de destruction d'un job
250   void BatchManager_Local::cancel(pthread_t thread_id)
251   {
252     pthread_cancel(thread_id);
253   }
254
255
256   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
257   {
258     ostringstream exec_sub_cmd;
259
260 #ifdef WIN32
261     char drive[_MAX_DRIVE];
262     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
263     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
264 #endif
265
266     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
267
268     if (param.find(ARGUMENTS) != param.end()) {
269       Versatile V = param[ARGUMENTS];
270       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
271         StringType argt = * static_cast<StringType *>(*it);
272         string     arg  = argt;
273         exec_sub_cmd << " " << arg;
274       }
275     }
276
277     string user;
278     Parametre::const_iterator it = param.find(USER);
279     if (it != param.end()) {
280       user = string(it->second);
281     }
282
283     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
284   }
285
286
287
288   // Constructeur de la classe ThreadAdapter
289   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
290   _bm(bm), _job(job), _id(id)
291   {
292     // Nothing to do
293   }
294
295
296
297   // Methode d'execution du thread
298   void * BatchManager_Local::ThreadAdapter::run(void * arg)
299   {
300 #ifndef WIN32
301     // On bloque tous les signaux pour ce thread
302     sigset_t setmask;
303     sigfillset(&setmask);
304     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
305 #endif
306
307     // On autorise la terminaison differee du thread
308     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
309     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
310     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
311
312     // On enregistre la fonction de suppression du fils en cas d'arret du thread
313     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
314     // sera prise en compte par pthread_testcancel()
315     Process child;
316     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
317     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
318
319     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
320
321
322
323
324     // Le code retour cumule (ORed) de tous les appels
325     // Nul en cas de reussite de l'ensemble des operations
326     int rc = 0;
327
328     // Cette table contient la liste des fichiers a detruire a la fin du processus
329     std::vector<string> files_to_delete;
330
331
332
333     // On copie les fichiers d'entree pour le fils
334     const Parametre param   = p_ta->_job.getParametre();
335     Parametre::const_iterator it;
336
337     // On initialise la variable workdir a la valeur du Current Working Directory
338     char * cwd =
339 #ifdef WIN32
340       _getcwd(NULL, 0);
341 #else
342       new char [PATH_MAX];
343     getcwd(cwd, PATH_MAX);
344 #endif
345     string workdir = cwd;
346     delete [] cwd;
347
348     if ( (it = param.find(WORKDIR)) != param.end() ) {
349       workdir = static_cast<string>( (*it).second );
350     }
351
352     string executionhost = string(param[EXECUTIONHOST]);
353     string user;
354     if ( (it = param.find(USER)) != param.end() ) {
355       user = string(it->second);
356     }
357
358     if ( (it = param.find(INFILE)) != param.end() ) {
359       Versatile V = (*it).second;
360       Versatile::iterator Vit;
361
362       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
363         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
364         Couple cp       = cpt;
365         string local    = cp.getLocal();
366         string remote   = cp.getRemote();
367
368         std::cerr << workdir << std::endl;
369         std::cerr << remote << std::endl;
370
371         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
372                                                                     workdir + "/" + remote,
373                                                                     executionhost, user);
374         if (status) {
375           // Echec de la copie
376           rc |= 1;
377         } else {
378           // On enregistre le fichier comme etant a detruire
379           files_to_delete.push_back(workdir + "/" + remote);
380         }
381
382       }
383     }
384
385
386
387
388     // On forke/exec un nouveau process pour pouvoir controler le fils
389     // (plus finement qu'avec un appel system)
390     // int rc = system(commande.c_str());
391     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
392     //execv("/usr/bin/ssh", parmList);
393 #ifdef WIN32
394     child = p_ta->launchWin32ChildProcess();
395     p_ta->pere(child);
396 #else
397     child = fork();
398     if (child < 0) { // erreur
399       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
400
401     } else if (child > 0) { // pere
402       p_ta->pere(child);
403
404     } else { // fils
405       p_ta->fils();
406     }
407 #endif
408
409
410     // On copie les fichiers de sortie du fils
411     if ( (it = param.find(OUTFILE)) != param.end() ) {
412       Versatile V = (*it).second;
413       Versatile::iterator Vit;
414
415       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
416         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
417         Couple cp       = cpt;
418         string local    = cp.getLocal();
419         string remote   = cp.getRemote();
420
421         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
422                                                                     executionhost, user,
423                                                                     local, "", "");
424         if (status) {
425           // Echec de la copie
426           rc |= 1;
427         } else {
428           // On enregistre le fichier comme etant a detruire
429           files_to_delete.push_back(workdir + "/" + remote);
430         }
431
432       }
433     }
434
435     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
436     // ou si la creation du fils n'a pu avoir lieu
437     if ( (rc == 0) || (child < 0) ) {
438       std::vector<string>::const_iterator it;
439       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
440         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
441 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
442         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
443 #ifdef WIN32
444         remove_cmd = string("\"") + remove_cmd + string("\"");
445 #endif
446         system(remove_cmd.c_str());*/
447       }
448     }
449
450
451
452     // On retire la fonction de nettoyage de la memoire
453     pthread_cleanup_pop(0);
454
455     // On retire la fonction de suppression du fils
456     pthread_cleanup_pop(0);
457
458
459
460     // On invoque la fonction de nettoyage de la memoire
461     delete_on_exit(arg);
462
463     UNDER_LOCK( cout << "Father is leaving" << endl );
464     pthread_exit(NULL);
465     return NULL;
466   }
467
468
469
470
471   void BatchManager_Local::ThreadAdapter::pere(Process child)
472   {
473     time_t child_starttime = time(NULL);
474
475     // On enregistre le fils dans la table des threads
476     pthread_t thread_id = pthread_self();
477
478     Parametre param   = _job.getParametre();
479     Environnement env = _job.getEnvironnement();
480
481     ostringstream id_sst;
482     id_sst << _id;
483     param[ID]         = id_sst.str();
484     param[STATE]      = Batch::RUNNING;
485 #ifndef WIN32
486     param[PID]        = child;
487 #endif
488
489     _bm._threads[_id].thread_id = thread_id;
490 #ifndef WIN32
491     _bm._threads[_id].pid       = child;
492 #endif
493     _bm._threads[_id].status    = RUNNING;
494     _bm._threads[_id].param     = param;
495     _bm._threads[_id].env       = env;
496     _bm._threads[_id].command_queue.push(NOP);
497
498     // Unlock the master thread. From here, all shared variables must be protected
499     // from concurrent access
500     pthread_cond_signal(&_bm._threadLaunchCondition);
501
502
503     // on boucle en attendant que le fils ait termine
504     while (1) {
505 #ifdef WIN32
506       DWORD exitCode;
507       BOOL res = GetExitCodeProcess(child, &exitCode);
508       if (exitCode != STILL_ACTIVE) {
509         pthread_mutex_lock(&_bm._threads_mutex);
510         _bm._threads[_id].status       = DONE;
511         _bm._threads[_id].param[STATE] = Batch::FINISHED;
512         pthread_mutex_unlock(&_bm._threads_mutex);
513         // @@@ --------> SECTION CRITIQUE <-------- @@@
514         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
515         break;
516       }
517 #else
518       int child_rc = 0;
519       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
520       if (child_wait_rc > 0) {
521          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
522          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
523          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
524          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
525          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
526          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
527          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
528 #ifdef WIFCONTINUED
529          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
530 #endif
531         if (WIFSTOPPED(child_rc)) {
532           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
533           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
534           // desactive car s'il est possible de detecter l'arret d'un process, il est
535           // plus difficile de detecter sa reprise.
536
537           // Le fils est simplement stoppe
538           // @@@ --------> SECTION CRITIQUE <-------- @@@
539           pthread_mutex_lock(&_bm._threads_mutex);
540           _bm._threads[_id].status       = STOPPED;
541           _bm._threads[_id].param[STATE] = Batch::PAUSED;
542           pthread_mutex_unlock(&_bm._threads_mutex);
543           // @@@ --------> SECTION CRITIQUE <-------- @@@
544           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
545
546         }
547         else {
548           // Le fils est termine, on sort de la boucle et du if englobant
549           // @@@ --------> SECTION CRITIQUE <-------- @@@
550           pthread_mutex_lock(&_bm._threads_mutex);
551           _bm._threads[_id].status       = DONE;
552           _bm._threads[_id].param[STATE] = Batch::FINISHED;
553           pthread_mutex_unlock(&_bm._threads_mutex);
554           // @@@ --------> SECTION CRITIQUE <-------- @@@
555           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
556           break;
557         }
558       }
559       else if (child_wait_rc == -1) {
560         // Le fils a disparu ...
561         // @@@ --------> SECTION CRITIQUE <-------- @@@
562         pthread_mutex_lock(&_bm._threads_mutex);
563         _bm._threads[_id].status       = DEAD;
564         _bm._threads[_id].param[STATE] = Batch::FAILED;
565         pthread_mutex_unlock(&_bm._threads_mutex);
566         // @@@ --------> SECTION CRITIQUE <-------- @@@
567         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
568         break;
569       }
570 #endif
571
572       // On teste si le thread doit etre detruit
573       pthread_testcancel();
574
575
576
577       // On regarde si le fils n'a pas depasse son temps (wallclock time)
578       time_t child_currenttime = time(NULL);
579       time_t child_elapsedtime = child_currenttime - child_starttime;
580       if (param.find(MAXWALLTIME) != param.end()) {
581         int maxwalltime = param[MAXWALLTIME];
582         //        cout << "child_starttime          = " << child_starttime        << endl
583         //             << "child_currenttime        = " << child_currenttime      << endl
584         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
585         //             << "maxwalltime              = " << maxwalltime            << endl
586         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
587         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
588           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
589           // On introduit une commande dans la queue du thread
590           // @@@ --------> SECTION CRITIQUE <-------- @@@
591           pthread_mutex_lock(&_bm._threads_mutex);
592           if (_bm._threads.find(_id) != _bm._threads.end())
593             _bm._threads[_id].command_queue.push(KILL);
594           pthread_mutex_unlock(&_bm._threads_mutex);
595           // @@@ --------> SECTION CRITIQUE <-------- @@@
596
597
598         } else if (child_elapsedtime > maxwalltime ) {
599           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
600           // On introduit une commande dans la queue du thread
601           // @@@ --------> SECTION CRITIQUE <-------- @@@
602           pthread_mutex_lock(&_bm._threads_mutex);
603           if (_bm._threads.find(_id) != _bm._threads.end())
604             _bm._threads[_id].command_queue.push(TERM);
605           pthread_mutex_unlock(&_bm._threads_mutex);
606           // @@@ --------> SECTION CRITIQUE <-------- @@@
607         }
608       }
609
610
611
612       // On regarde s'il y a quelque chose a faire dans la queue de commande
613       // @@@ --------> SECTION CRITIQUE <-------- @@@
614       pthread_mutex_lock(&_bm._threads_mutex);
615       if (_bm._threads.find(_id) != _bm._threads.end()) {
616         while (_bm._threads[_id].command_queue.size() > 0) {
617           Commande cmd = _bm._threads[_id].command_queue.front();
618           _bm._threads[_id].command_queue.pop();
619
620           switch (cmd) {
621     case NOP:
622       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
623       break;
624 #ifndef WIN32
625     case HOLD:
626       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
627       kill(child, SIGSTOP);
628       break;
629
630     case RELEASE:
631       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
632       kill(child, SIGCONT);
633       break;
634
635     case TERM:
636       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
637       kill(child, SIGTERM);
638       break;
639
640     case KILL:
641       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
642       kill(child, SIGKILL);
643       break;
644 #endif
645     case ALTER:
646       break;
647
648     default:
649       break;
650           }
651         }
652
653       }
654       pthread_mutex_unlock(&_bm._threads_mutex);
655       // @@@ --------> SECTION CRITIQUE <-------- @@@
656
657       // On fait une petite pause pour ne pas surcharger inutilement le processeur
658 #ifdef WIN32
659       Sleep(1000);
660 #else
661       sleep(1);
662 #endif
663
664     }
665
666   }
667
668
669
670 #ifndef WIN32
671
672   void BatchManager_Local::ThreadAdapter::fils()
673   {
674     Parametre param = _job.getParametre();
675     Parametre::iterator it;
676
677       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
678       //int result = execv("/usr/bin/ssh", parmList);
679       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
680       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
681     try {
682
683       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
684       vector<string> command;
685       if (param.find(EXECUTABLE) != param.end()) {
686         command = _bm.exec_command(param);
687       } else exit(1);
688
689       // Build the argument array argv from the command
690       char ** argv = new char * [command.size() + 1];
691       string comstr;
692       for (string::size_type i=0 ; i<command.size() ; i++) {
693         argv[i] = new char[command[i].size() + 1];
694         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
695         if (i>0) comstr += " # ";
696         comstr += command[i];
697       }
698
699       argv[command.size()] = NULL;
700
701       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
702       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
703
704       // Create the environment for the new process. Note (RB): Here we change the environment for
705       // the process launched in local. It would seem more logical to set the environment for the
706       // remote process.
707       Environnement env = _job.getEnvironnement();
708
709       char ** envp = NULL;
710       if(env.size() > 0) {
711         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
712         int i = 0;
713         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
714           const string  & key   = (*it).first;
715           const string  & value = (*it).second;
716           ostringstream oss;
717           oss << key << "=" << value;
718           envp[i]         = new char [oss.str().size() + 1];
719           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
720         }
721
722         // assert (i == env.size())
723         envp[i] = NULL;
724       }
725
726       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
727       //int result = execv("/usr/bin/ssh", parmList);
728       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
729       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
730
731
732
733       // On positionne les limites systeme imposees au fils
734       if (param.find(MAXCPUTIME) != param.end()) {
735         int maxcputime = param[MAXCPUTIME];
736         struct rlimit limit;
737         limit.rlim_cur = maxcputime;
738         limit.rlim_max = int(maxcputime * 1.1);
739         setrlimit(RLIMIT_CPU, &limit);
740       }
741
742       if (param.find(MAXDISKSIZE) != param.end()) {
743         int maxdisksize = param[MAXDISKSIZE];
744         struct rlimit limit;
745         limit.rlim_cur = maxdisksize * 1024;
746         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
747         setrlimit(RLIMIT_FSIZE, &limit);
748       }
749
750       if (param.find(MAXRAMSIZE) != param.end()) {
751         int maxramsize = param[MAXRAMSIZE];
752         struct rlimit limit;
753         limit.rlim_cur = maxramsize * 1024;
754         limit.rlim_max = int(maxramsize * 1.1) * 1024;
755         setrlimit(RLIMIT_AS, &limit);
756       }
757
758
759       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
760       //int result = execv("/usr/bin/ssh", parmList);
761       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
762       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
763
764       // On cree une session pour le fils de facon a ce qu'il ne soit pas
765       // detruit lorsque le shell se termine (le shell ouvre une session et
766       // tue tous les process appartenant a la session en quittant)
767       setsid();
768
769
770       // On ferme les descripteurs de fichiers standards
771       //close(STDIN_FILENO);
772       //close(STDOUT_FILENO);
773       //close(STDERR_FILENO);
774
775
776       // On execute la commande du fils
777       int result = execve(argv[0], argv, envp);
778       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
779       // No need to deallocate since nothing happens after a successful exec
780
781       // Normalement on ne devrait jamais arriver ici
782       ofstream file_err("error.log");
783       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
784
785     } catch (GenericException & e) {
786
787       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
788     }
789
790     exit(99);
791   }
792
793 #else
794
795   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
796   {
797     Parametre param = _job.getParametre();
798     Parametre::iterator it;
799     PROCESS_INFORMATION pi;
800
801     try {
802
803       // EXECUTABLE is MANDATORY, if missing, we throw an exception
804       vector<string> exec_command;
805       if (param.find(EXECUTABLE) != param.end()) {
806         exec_command = _bm.exec_command(param);
807       } else {
808         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
809       }
810
811       // Build the command string from the command argument vector
812       string comstr;
813       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
814         if (i>0) comstr += " ";
815         comstr += exec_command[i];
816       }
817
818       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
819
820       // Create the environment for the new process. Note (RB): Here we change the environment for
821       // the process launched in local. It would seem more logical to set the environment for the
822       // remote process.
823       // Note that if no environment is specified, we reuse the current environment.
824       Environnement env = _job.getEnvironnement();
825       char * chNewEnv = NULL;
826
827       if(env.size() > 0) {
828         chNewEnv = new char[4096];
829         LPTSTR lpszCurrentVariable = chNewEnv;
830         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
831           const string  & key   = (*it).first;
832           const string  & value = (*it).second;
833           string envvar = key + "=" + value;
834           envvar.copy(lpszCurrentVariable, envvar.size());
835           lpszCurrentVariable[envvar.size()] = '\0';
836           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
837         }
838         // Terminate the block with a NULL byte.
839         *lpszCurrentVariable = '\0';
840       }
841
842
843       STARTUPINFO si;
844       ZeroMemory( &si, sizeof(si) );
845       si.cb = sizeof(si);
846       ZeroMemory( &pi, sizeof(pi) );
847
848       // Copy the command to a non-const buffer
849       char * buffer = strdup(comstr.c_str());
850
851       // launch the new process
852       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
853                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
854
855       if (buffer) free(buffer);
856       if (!res) throw RunTimeException("Error while creating new process");
857
858       CloseHandle(pi.hThread);
859
860     } catch (GenericException & e) {
861
862       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
863     }
864
865     return pi.hProcess;
866   }
867
868 #endif
869
870
871   void BatchManager_Local::kill_child_on_exit(void * p_pid)
872   {
873 #ifndef WIN32
874     //TODO: porting of following functionality
875     pid_t child = * static_cast<pid_t *>(p_pid);
876
877     // On tue le fils
878     kill(child, SIGTERM);
879
880     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
881     // mais cette option n'est pas implementee pour le moment, car il est
882     // preferable de laisser le process fils se terminer normalement et seul.
883 #endif
884   }
885
886   void BatchManager_Local::delete_on_exit(void * arg)
887   {
888     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
889     delete p_ta;
890   }
891
892 }