Salome HOME
e9797fc8e627021b0f161e21de0a9e2d6a5e4519
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53 #include "Batch_IOMutex.hxx"
54 #include "Batch_BatchManager_Local.hxx"
55 #include "Batch_RunTimeException.hxx"
56
57 using namespace std;
58
59 namespace Batch {
60
61
62   // Constructeur
63   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64                                          CommunicationProtocolType protocolType)
65     : BatchManager(parent, host), _connect(0),
66       _protocol(CommunicationProtocol::getInstance(protocolType)),
67       _idCounter(0)
68   {
69     pthread_mutex_init(&_threads_mutex, NULL);
70     pthread_cond_init(&_threadLaunchCondition, NULL);
71   }
72
73   // Destructeur
74   BatchManager_Local::~BatchManager_Local()
75   {
76     pthread_mutex_destroy(&_threads_mutex);
77     pthread_cond_destroy(&_threadLaunchCondition);
78   }
79
80   const CommunicationProtocol & BatchManager_Local::getProtocol() const
81   {
82     return _protocol;
83   }
84
85   // Methode pour le controle des jobs : soumet un job au gestionnaire
86   const JobId BatchManager_Local::submitJob(const Job & job)
87   {
88     Job_Local jobLocal = job;
89     Id id = _idCounter++;
90     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
91
92     // Les attributs du thread a sa creation
93     pthread_attr_t thread_attr;
94     pthread_attr_init(&thread_attr);
95     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
96
97     // Creation du thread qui va executer la commande systeme qu'on lui passe
98     pthread_t thread_id;
99     pthread_mutex_lock(&_threads_mutex);
100     int rc = pthread_create(&thread_id,
101       &thread_attr,
102       &ThreadAdapter::run,
103       static_cast<void *>(p_ta));
104
105     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
106     pthread_attr_destroy(&thread_attr);
107
108     if (rc != 0) {
109       pthread_mutex_unlock(&_threads_mutex);
110       throw RunTimeException("Can't create new thread in BatchManager_Local");
111     }
112
113     pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex);
114     pthread_mutex_unlock(&_threads_mutex);
115
116     ostringstream id_sst;
117     id_sst << id;
118     return JobId(this, id_sst.str());
119   }
120
121   // Methode pour le controle des jobs : retire un job du gestionnaire
122   void BatchManager_Local::deleteJob(const JobId & jobid)
123   {
124     Id id;
125
126     istringstream iss(jobid.getReference());
127     iss >> id;
128
129     // On retrouve le thread_id du thread
130     pthread_t thread_id;
131
132     // @@@ --------> SECTION CRITIQUE <-------- @@@
133     pthread_mutex_lock(&_threads_mutex);
134     if (_threads.find(id) != _threads.end())
135       thread_id = _threads[id].thread_id;
136     pthread_mutex_unlock(&_threads_mutex);
137     // @@@ --------> SECTION CRITIQUE <-------- @@@
138
139     cancel(thread_id);
140   }
141
142   // Methode pour le controle des jobs : suspend un job en file d'attente
143   void BatchManager_Local::holdJob(const JobId & jobid)
144   {
145     Id id;
146     istringstream iss(jobid.getReference());
147     iss >> id;
148
149     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
150
151     // On introduit une commande dans la queue du thread
152     // @@@ --------> SECTION CRITIQUE <-------- @@@
153     pthread_mutex_lock(&_threads_mutex);
154     if (_threads.find(id) != _threads.end())
155       _threads[id].command_queue.push(HOLD);
156     pthread_mutex_unlock(&_threads_mutex);
157     // @@@ --------> SECTION CRITIQUE <-------- @@@
158   }
159
160   // Methode pour le controle des jobs : relache un job suspendu
161   void BatchManager_Local::releaseJob(const JobId & jobid)
162   {
163     Id id;
164     istringstream iss(jobid.getReference());
165     iss >> id;
166
167     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
168
169     // On introduit une commande dans la queue du thread
170     // @@@ --------> SECTION CRITIQUE <-------- @@@
171     pthread_mutex_lock(&_threads_mutex);
172     if (_threads.find(id) != _threads.end())
173       _threads[id].command_queue.push(RELEASE);
174     pthread_mutex_unlock(&_threads_mutex);
175     // @@@ --------> SECTION CRITIQUE <-------- @@@
176   }
177
178
179   // Methode pour le controle des jobs : modifie un job en file d'attente
180   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
181   {
182   }
183
184   // Methode pour le controle des jobs : modifie un job en file d'attente
185   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
186   {
187     alterJob(jobid, param, Environnement());
188   }
189
190   // Methode pour le controle des jobs : modifie un job en file d'attente
191   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
192   {
193     alterJob(jobid, Parametre(), env);
194   }
195
196
197
198   // Methode pour le controle des jobs : renvoie l'etat du job
199   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
200   {
201     Id id;
202     istringstream iss(jobid.getReference());
203     iss >> id;
204
205     Parametre param;
206     Environnement env;
207
208     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
209     // @@@ --------> SECTION CRITIQUE <-------- @@@
210     pthread_mutex_lock(&_threads_mutex);
211     std::map<Id, Child >::iterator pos = _threads.find(id);
212     bool found = (pos != _threads.end());
213     if (found) {
214       param = pos->second.param;
215       env   = pos->second.env;
216     }
217     pthread_mutex_unlock(&_threads_mutex);
218     // @@@ --------> SECTION CRITIQUE <-------- @@@
219     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
220
221     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
222
223     JobInfo_Local ji(param, env);
224     return ji;
225   }
226
227
228
229   // Methode pour le controle des jobs : teste si un job est present en machine
230   bool BatchManager_Local::isRunning(const JobId & jobid)
231   {
232     Id id;
233     istringstream iss(jobid.getReference());
234     iss >> id;
235
236     Status status;
237
238     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
239     // @@@ --------> SECTION CRITIQUE <-------- @@@
240     pthread_mutex_lock(&_threads_mutex);
241     status = _threads[id].status;
242     pthread_mutex_unlock(&_threads_mutex);
243     // @@@ --------> SECTION CRITIQUE <-------- @@@
244     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
245
246     return (status == RUNNING);
247   }
248
249   // Methode de destruction d'un job
250   void BatchManager_Local::cancel(pthread_t thread_id)
251   {
252     pthread_cancel(thread_id);
253   }
254
255
256   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
257   {
258     ostringstream exec_sub_cmd;
259
260 #ifdef WIN32
261     char drive[_MAX_DRIVE];
262     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
263     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
264 #endif
265
266     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
267
268     if (param.find(ARGUMENTS) != param.end()) {
269       Versatile V = param[ARGUMENTS];
270       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
271         StringType argt = * static_cast<StringType *>(*it);
272         string     arg  = argt;
273         exec_sub_cmd << " " << arg;
274       }
275     }
276
277     if (param.find(INFILE) != param.end()) {
278       Versatile V = param[INFILE];
279       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
280         Couple cpl = * static_cast<CoupleType*>(*it);
281         string remote = cpl.getRemote();
282         if (remote == "stdin")
283         exec_sub_cmd << " <stdin";
284       }
285     }
286
287     if (param.find(OUTFILE) != param.end()) {
288       Versatile V = param[OUTFILE];
289       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
290         Couple cpl = * static_cast<CoupleType*>(*it);
291         string remote = cpl.getRemote();
292         if (remote == "stdout") exec_sub_cmd << " 1>stdout";
293         if (remote == "stderr") exec_sub_cmd << " 2>stderr";
294       }
295     }
296
297     string user;
298     Parametre::const_iterator it = param.find(USER);
299     if (it != param.end()) {
300       user = string(it->second);
301     }
302
303     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
304   }
305
306
307
308   // Constructeur de la classe ThreadAdapter
309   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
310   _bm(bm), _job(job), _id(id)
311   {
312     // Nothing to do
313   }
314
315
316
317   // Methode d'execution du thread
318   void * BatchManager_Local::ThreadAdapter::run(void * arg)
319   {
320 #ifndef WIN32
321     // On bloque tous les signaux pour ce thread
322     sigset_t setmask;
323     sigfillset(&setmask);
324     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
325 #endif
326
327     // On autorise la terminaison differee du thread
328     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
329     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
330     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
331
332     // On enregistre la fonction de suppression du fils en cas d'arret du thread
333     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
334     // sera prise en compte par pthread_testcancel()
335     Process child;
336     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
337     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
338
339     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
340
341
342
343
344     // Le code retour cumule (ORed) de tous les appels
345     // Nul en cas de reussite de l'ensemble des operations
346     int rc = 0;
347
348     // Cette table contient la liste des fichiers a detruire a la fin du processus
349     std::vector<string> files_to_delete;
350
351
352
353     // On copie les fichiers d'entree pour le fils
354     const Parametre param   = p_ta->_job.getParametre();
355     Parametre::const_iterator it;
356
357     // On initialise la variable workdir a la valeur du Current Working Directory
358     char * cwd =
359 #ifdef WIN32
360       _getcwd(NULL, 0);
361 #else
362       new char [PATH_MAX];
363     getcwd(cwd, PATH_MAX);
364 #endif
365     string workdir = cwd;
366     delete [] cwd;
367
368     if ( (it = param.find(WORKDIR)) != param.end() ) {
369       workdir = static_cast<string>( (*it).second );
370     }
371
372     string executionhost = string(param[EXECUTIONHOST]);
373     string user;
374     if ( (it = param.find(USER)) != param.end() ) {
375       user = string(it->second);
376     }
377
378     if ( (it = param.find(INFILE)) != param.end() ) {
379       Versatile V = (*it).second;
380       Versatile::iterator Vit;
381
382       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
383         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
384         Couple cp       = cpt;
385         string local    = cp.getLocal();
386         string remote   = cp.getRemote();
387
388         std::cerr << workdir << std::endl;
389         std::cerr << remote << std::endl;
390
391         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
392                                                                     workdir + "/" + remote,
393                                                                     executionhost, user);
394         if (status) {
395           // Echec de la copie
396           rc |= 1;
397         } else {
398           // On enregistre le fichier comme etant a detruire
399           files_to_delete.push_back(workdir + "/" + remote);
400         }
401
402       }
403     }
404
405
406
407
408     // On forke/exec un nouveau process pour pouvoir controler le fils
409     // (plus finement qu'avec un appel system)
410     // int rc = system(commande.c_str());
411     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
412     //execv("/usr/bin/ssh", parmList);
413 #ifdef WIN32
414     child = p_ta->launchWin32ChildProcess();
415     p_ta->pere(child);
416 #else
417     child = fork();
418     if (child < 0) { // erreur
419       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
420
421     } else if (child > 0) { // pere
422       p_ta->pere(child);
423
424     } else { // fils
425       p_ta->fils();
426     }
427 #endif
428
429
430     // On copie les fichiers de sortie du fils
431     if ( (it = param.find(OUTFILE)) != param.end() ) {
432       Versatile V = (*it).second;
433       Versatile::iterator Vit;
434
435       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
436         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
437         Couple cp       = cpt;
438         string local    = cp.getLocal();
439         string remote   = cp.getRemote();
440
441         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
442                                                                     executionhost, user,
443                                                                     local, "", "");
444         if (status) {
445           // Echec de la copie
446           rc |= 1;
447         } else {
448           // On enregistre le fichier comme etant a detruire
449           files_to_delete.push_back(workdir + "/" + remote);
450         }
451
452       }
453     }
454
455     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
456     // ou si la creation du fils n'a pu avoir lieu
457     if ( (rc == 0) || (child < 0) ) {
458       std::vector<string>::const_iterator it;
459       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
460         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
461 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
462         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
463 #ifdef WIN32
464         remove_cmd = string("\"") + remove_cmd + string("\"");
465 #endif
466         system(remove_cmd.c_str());*/
467       }
468     }
469
470
471
472     // On retire la fonction de nettoyage de la memoire
473     pthread_cleanup_pop(0);
474
475     // On retire la fonction de suppression du fils
476     pthread_cleanup_pop(0);
477
478
479
480     // On invoque la fonction de nettoyage de la memoire
481     delete_on_exit(arg);
482
483     UNDER_LOCK( cout << "Father is leaving" << endl );
484     pthread_exit(NULL);
485     return NULL;
486   }
487
488
489
490
491   void BatchManager_Local::ThreadAdapter::pere(Process child)
492   {
493     time_t child_starttime = time(NULL);
494
495     // On enregistre le fils dans la table des threads
496     pthread_t thread_id = pthread_self();
497
498     Parametre param   = _job.getParametre();
499     Environnement env = _job.getEnvironnement();
500
501     ostringstream id_sst;
502     id_sst << _id;
503     param[ID]         = id_sst.str();
504     param[STATE]      = Batch::RUNNING;
505 #ifndef WIN32
506     param[PID]        = child;
507 #endif
508
509     _bm._threads[_id].thread_id = thread_id;
510 #ifndef WIN32
511     _bm._threads[_id].pid       = child;
512 #endif
513     _bm._threads[_id].status    = RUNNING;
514     _bm._threads[_id].param     = param;
515     _bm._threads[_id].env       = env;
516     _bm._threads[_id].command_queue.push(NOP);
517
518     // Unlock the master thread. From here, all shared variables must be protected
519     // from concurrent access
520     pthread_cond_signal(&_bm._threadLaunchCondition);
521
522
523     // on boucle en attendant que le fils ait termine
524     while (1) {
525 #ifdef WIN32
526       DWORD exitCode;
527       BOOL res = GetExitCodeProcess(child, &exitCode);
528       if (exitCode != STILL_ACTIVE) {
529         pthread_mutex_lock(&_bm._threads_mutex);
530         _bm._threads[_id].status       = DONE;
531         _bm._threads[_id].param[STATE] = Batch::FINISHED;
532         pthread_mutex_unlock(&_bm._threads_mutex);
533         // @@@ --------> SECTION CRITIQUE <-------- @@@
534         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
535         break;
536       }
537 #else
538       int child_rc = 0;
539       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
540       if (child_wait_rc > 0) {
541          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
542          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
543          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
544          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
545          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
546          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
547          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
548 #ifdef WIFCONTINUED
549          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
550 #endif
551         if (WIFSTOPPED(child_rc)) {
552           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
553           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
554           // desactive car s'il est possible de detecter l'arret d'un process, il est
555           // plus difficile de detecter sa reprise.
556
557           // Le fils est simplement stoppe
558           // @@@ --------> SECTION CRITIQUE <-------- @@@
559           pthread_mutex_lock(&_bm._threads_mutex);
560           _bm._threads[_id].status       = STOPPED;
561           _bm._threads[_id].param[STATE] = Batch::PAUSED;
562           pthread_mutex_unlock(&_bm._threads_mutex);
563           // @@@ --------> SECTION CRITIQUE <-------- @@@
564           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
565
566         }
567         else {
568           // Le fils est termine, on sort de la boucle et du if englobant
569           // @@@ --------> SECTION CRITIQUE <-------- @@@
570           pthread_mutex_lock(&_bm._threads_mutex);
571           _bm._threads[_id].status       = DONE;
572           _bm._threads[_id].param[STATE] = Batch::FINISHED;
573           pthread_mutex_unlock(&_bm._threads_mutex);
574           // @@@ --------> SECTION CRITIQUE <-------- @@@
575           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
576           break;
577         }
578       }
579       else if (child_wait_rc == -1) {
580         // Le fils a disparu ...
581         // @@@ --------> SECTION CRITIQUE <-------- @@@
582         pthread_mutex_lock(&_bm._threads_mutex);
583         _bm._threads[_id].status       = DEAD;
584         _bm._threads[_id].param[STATE] = Batch::FAILED;
585         pthread_mutex_unlock(&_bm._threads_mutex);
586         // @@@ --------> SECTION CRITIQUE <-------- @@@
587         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
588         break;
589       }
590 #endif
591
592       // On teste si le thread doit etre detruit
593       pthread_testcancel();
594
595
596
597       // On regarde si le fils n'a pas depasse son temps (wallclock time)
598       time_t child_currenttime = time(NULL);
599       time_t child_elapsedtime = child_currenttime - child_starttime;
600       if (param.find(MAXWALLTIME) != param.end()) {
601         int maxwalltime = param[MAXWALLTIME];
602         //        cout << "child_starttime          = " << child_starttime        << endl
603         //             << "child_currenttime        = " << child_currenttime      << endl
604         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
605         //             << "maxwalltime              = " << maxwalltime            << endl
606         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
607         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
608           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
609           // On introduit une commande dans la queue du thread
610           // @@@ --------> SECTION CRITIQUE <-------- @@@
611           pthread_mutex_lock(&_bm._threads_mutex);
612           if (_bm._threads.find(_id) != _bm._threads.end())
613             _bm._threads[_id].command_queue.push(KILL);
614           pthread_mutex_unlock(&_bm._threads_mutex);
615           // @@@ --------> SECTION CRITIQUE <-------- @@@
616
617
618         } else if (child_elapsedtime > maxwalltime ) {
619           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
620           // On introduit une commande dans la queue du thread
621           // @@@ --------> SECTION CRITIQUE <-------- @@@
622           pthread_mutex_lock(&_bm._threads_mutex);
623           if (_bm._threads.find(_id) != _bm._threads.end())
624             _bm._threads[_id].command_queue.push(TERM);
625           pthread_mutex_unlock(&_bm._threads_mutex);
626           // @@@ --------> SECTION CRITIQUE <-------- @@@
627         }
628       }
629
630
631
632       // On regarde s'il y a quelque chose a faire dans la queue de commande
633       // @@@ --------> SECTION CRITIQUE <-------- @@@
634       pthread_mutex_lock(&_bm._threads_mutex);
635       if (_bm._threads.find(_id) != _bm._threads.end()) {
636         while (_bm._threads[_id].command_queue.size() > 0) {
637           Commande cmd = _bm._threads[_id].command_queue.front();
638           _bm._threads[_id].command_queue.pop();
639
640           switch (cmd) {
641     case NOP:
642       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
643       break;
644 #ifndef WIN32
645     case HOLD:
646       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
647       kill(child, SIGSTOP);
648       break;
649
650     case RELEASE:
651       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
652       kill(child, SIGCONT);
653       break;
654
655     case TERM:
656       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
657       kill(child, SIGTERM);
658       break;
659
660     case KILL:
661       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
662       kill(child, SIGKILL);
663       break;
664 #endif
665     case ALTER:
666       break;
667
668     default:
669       break;
670           }
671         }
672
673       }
674       pthread_mutex_unlock(&_bm._threads_mutex);
675       // @@@ --------> SECTION CRITIQUE <-------- @@@
676
677       // On fait une petite pause pour ne pas surcharger inutilement le processeur
678 #ifdef WIN32
679       Sleep(1000);
680 #else
681       sleep(1);
682 #endif
683
684     }
685
686   }
687
688
689
690 #ifndef WIN32
691
692   void BatchManager_Local::ThreadAdapter::fils()
693   {
694     Parametre param = _job.getParametre();
695     Parametre::iterator it;
696
697       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
698       //int result = execv("/usr/bin/ssh", parmList);
699       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
700       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
701     try {
702
703       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
704       vector<string> command;
705       if (param.find(EXECUTABLE) != param.end()) {
706         command = _bm.exec_command(param);
707       } else exit(1);
708
709       // Build the argument array argv from the command
710       char ** argv = new char * [command.size() + 1];
711       string comstr;
712       for (string::size_type i=0 ; i<command.size() ; i++) {
713         argv[i] = new char[command[i].size() + 1];
714         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
715         if (i>0) comstr += " # ";
716         comstr += command[i];
717       }
718
719       argv[command.size()] = NULL;
720
721       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
722       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
723
724       // Create the environment for the new process. Note (RB): Here we change the environment for
725       // the process launched in local. It would seem more logical to set the environment for the
726       // remote process.
727       Environnement env = _job.getEnvironnement();
728
729       char ** envp = NULL;
730       if(env.size() > 0) {
731         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
732         int i = 0;
733         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
734           const string  & key   = (*it).first;
735           const string  & value = (*it).second;
736           ostringstream oss;
737           oss << key << "=" << value;
738           envp[i]         = new char [oss.str().size() + 1];
739           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
740         }
741
742         // assert (i == env.size())
743         envp[i] = NULL;
744       }
745
746       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
747       //int result = execv("/usr/bin/ssh", parmList);
748       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
749       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
750
751
752
753       // On positionne les limites systeme imposees au fils
754       if (param.find(MAXCPUTIME) != param.end()) {
755         int maxcputime = param[MAXCPUTIME];
756         struct rlimit limit;
757         limit.rlim_cur = maxcputime;
758         limit.rlim_max = int(maxcputime * 1.1);
759         setrlimit(RLIMIT_CPU, &limit);
760       }
761
762       if (param.find(MAXDISKSIZE) != param.end()) {
763         int maxdisksize = param[MAXDISKSIZE];
764         struct rlimit limit;
765         limit.rlim_cur = maxdisksize * 1024;
766         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
767         setrlimit(RLIMIT_FSIZE, &limit);
768       }
769
770       if (param.find(MAXRAMSIZE) != param.end()) {
771         int maxramsize = param[MAXRAMSIZE];
772         struct rlimit limit;
773         limit.rlim_cur = maxramsize * 1024;
774         limit.rlim_max = int(maxramsize * 1.1) * 1024;
775         setrlimit(RLIMIT_AS, &limit);
776       }
777
778
779       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
780       //int result = execv("/usr/bin/ssh", parmList);
781       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
782       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
783
784       // On cree une session pour le fils de facon a ce qu'il ne soit pas
785       // detruit lorsque le shell se termine (le shell ouvre une session et
786       // tue tous les process appartenant a la session en quittant)
787       setsid();
788
789
790       // On ferme les descripteurs de fichiers standards
791       //close(STDIN_FILENO);
792       //close(STDOUT_FILENO);
793       //close(STDERR_FILENO);
794
795
796       // On execute la commande du fils
797       int result = execve(argv[0], argv, envp);
798       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
799       // No need to deallocate since nothing happens after a successful exec
800
801       // Normalement on ne devrait jamais arriver ici
802       ofstream file_err("error.log");
803       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
804
805     } catch (GenericException & e) {
806
807       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
808     }
809
810     exit(99);
811   }
812
813 #else
814
815   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
816   {
817     Parametre param = _job.getParametre();
818     Parametre::iterator it;
819     PROCESS_INFORMATION pi;
820
821     try {
822
823       // EXECUTABLE is MANDATORY, if missing, we throw an exception
824       vector<string> exec_command;
825       if (param.find(EXECUTABLE) != param.end()) {
826         exec_command = _bm.exec_command(param);
827       } else {
828         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
829       }
830
831       // Build the command string from the command argument vector
832       string comstr;
833       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
834         if (i>0) comstr += " ";
835         comstr += exec_command[i];
836       }
837
838       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
839
840       // Create the environment for the new process. Note (RB): Here we change the environment for
841       // the process launched in local. It would seem more logical to set the environment for the
842       // remote process.
843       // Note that if no environment is specified, we reuse the current environment.
844       Environnement env = _job.getEnvironnement();
845       char * chNewEnv = NULL;
846
847       if(env.size() > 0) {
848         chNewEnv = new char[4096];
849         LPTSTR lpszCurrentVariable = chNewEnv;
850         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
851           const string  & key   = (*it).first;
852           const string  & value = (*it).second;
853           string envvar = key + "=" + value;
854           envvar.copy(lpszCurrentVariable, envvar.size());
855           lpszCurrentVariable[envvar.size()] = '\0';
856           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
857         }
858         // Terminate the block with a NULL byte.
859         *lpszCurrentVariable = '\0';
860       }
861
862
863       STARTUPINFO si;
864       ZeroMemory( &si, sizeof(si) );
865       si.cb = sizeof(si);
866       ZeroMemory( &pi, sizeof(pi) );
867
868       // Copy the command to a non-const buffer
869       char * buffer = strdup(comstr.c_str());
870
871       // launch the new process
872       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
873                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
874
875       if (buffer) free(buffer);
876       if (!res) throw RunTimeException("Error while creating new process");
877
878       CloseHandle(pi.hThread);
879
880     } catch (GenericException & e) {
881
882       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
883     }
884
885     return pi.hProcess;
886   }
887
888 #endif
889
890
891   void BatchManager_Local::kill_child_on_exit(void * p_pid)
892   {
893 #ifndef WIN32
894     //TODO: porting of following functionality
895     pid_t child = * static_cast<pid_t *>(p_pid);
896
897     // On tue le fils
898     kill(child, SIGTERM);
899
900     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
901     // mais cette option n'est pas implementee pour le moment, car il est
902     // preferable de laisser le process fils se terminer normalement et seul.
903 #endif
904   }
905
906   void BatchManager_Local::delete_on_exit(void * arg)
907   {
908     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
909     delete p_ta;
910   }
911
912 }