Salome HOME
Fixed another bug with threads in BatchManager_Local: no more segfault when destroyin...
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53 #include "Batch_IOMutex.hxx"
54 #include "Batch_BatchManager_Local.hxx"
55 #include "Batch_RunTimeException.hxx"
56
57 using namespace std;
58
59 namespace Batch {
60
61
62   // Constructeur
63   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64                                          CommunicationProtocolType protocolType)
65     : BatchManager(parent, host), _connect(0),
66       _protocol(CommunicationProtocol::getInstance(protocolType)),
67       _idCounter(0)
68   {
69     pthread_mutex_init(&_threads_mutex, NULL);
70     pthread_cond_init(&_threadSyncCondition, NULL);
71   }
72
73   // Destructeur
74   BatchManager_Local::~BatchManager_Local()
75   {
76     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
77       pthread_mutex_lock(&_threads_mutex);
78       string state = iter->second.param[STATE];
79       if (state != FINISHED && state != FAILED) {
80         UNDER_LOCK( cout << "Warning: Job " << iter->first <<
81                             " is not finished, it will now be canceled." << endl );
82         pthread_cancel(iter->second.thread_id);
83         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
84       }
85       pthread_mutex_unlock(&_threads_mutex);
86     }
87     pthread_mutex_destroy(&_threads_mutex);
88     pthread_cond_destroy(&_threadSyncCondition);
89   }
90
91   const CommunicationProtocol & BatchManager_Local::getProtocol() const
92   {
93     return _protocol;
94   }
95
96   // Methode pour le controle des jobs : soumet un job au gestionnaire
97   const JobId BatchManager_Local::submitJob(const Job & job)
98   {
99     Job_Local jobLocal = job;
100     Id id = _idCounter++;
101     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
102
103     // Les attributs du thread a sa creation
104     pthread_attr_t thread_attr;
105     pthread_attr_init(&thread_attr);
106     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
107
108     // Creation du thread qui va executer la commande systeme qu'on lui passe
109     pthread_t thread_id;
110     pthread_mutex_lock(&_threads_mutex);
111     int rc = pthread_create(&thread_id,
112       &thread_attr,
113       &ThreadAdapter::run,
114       static_cast<void *>(p_ta));
115
116     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
117     pthread_attr_destroy(&thread_attr);
118
119     if (rc != 0) {
120       pthread_mutex_unlock(&_threads_mutex);
121       throw RunTimeException("Can't create new thread in BatchManager_Local");
122     }
123
124     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
125     pthread_mutex_unlock(&_threads_mutex);
126
127     ostringstream id_sst;
128     id_sst << id;
129     return JobId(this, id_sst.str());
130   }
131
132   // Methode pour le controle des jobs : retire un job du gestionnaire
133   void BatchManager_Local::deleteJob(const JobId & jobid)
134   {
135     Id id;
136
137     istringstream iss(jobid.getReference());
138     iss >> id;
139
140     // On retrouve le thread_id du thread
141     pthread_t thread_id;
142
143     // @@@ --------> SECTION CRITIQUE <-------- @@@
144     pthread_mutex_lock(&_threads_mutex);
145     if (_threads.find(id) != _threads.end())
146       thread_id = _threads[id].thread_id;
147     pthread_mutex_unlock(&_threads_mutex);
148     // @@@ --------> SECTION CRITIQUE <-------- @@@
149
150     cancel(thread_id);
151   }
152
153   // Methode pour le controle des jobs : suspend un job en file d'attente
154   void BatchManager_Local::holdJob(const JobId & jobid)
155   {
156     Id id;
157     istringstream iss(jobid.getReference());
158     iss >> id;
159
160     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
161
162     // On introduit une commande dans la queue du thread
163     // @@@ --------> SECTION CRITIQUE <-------- @@@
164     pthread_mutex_lock(&_threads_mutex);
165     if (_threads.find(id) != _threads.end())
166       _threads[id].command_queue.push(HOLD);
167     pthread_mutex_unlock(&_threads_mutex);
168     // @@@ --------> SECTION CRITIQUE <-------- @@@
169   }
170
171   // Methode pour le controle des jobs : relache un job suspendu
172   void BatchManager_Local::releaseJob(const JobId & jobid)
173   {
174     Id id;
175     istringstream iss(jobid.getReference());
176     iss >> id;
177
178     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
179
180     // On introduit une commande dans la queue du thread
181     // @@@ --------> SECTION CRITIQUE <-------- @@@
182     pthread_mutex_lock(&_threads_mutex);
183     if (_threads.find(id) != _threads.end())
184       _threads[id].command_queue.push(RELEASE);
185     pthread_mutex_unlock(&_threads_mutex);
186     // @@@ --------> SECTION CRITIQUE <-------- @@@
187   }
188
189
190   // Methode pour le controle des jobs : modifie un job en file d'attente
191   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
192   {
193   }
194
195   // Methode pour le controle des jobs : modifie un job en file d'attente
196   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
197   {
198     alterJob(jobid, param, Environnement());
199   }
200
201   // Methode pour le controle des jobs : modifie un job en file d'attente
202   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
203   {
204     alterJob(jobid, Parametre(), env);
205   }
206
207
208
209   // Methode pour le controle des jobs : renvoie l'etat du job
210   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
211   {
212     Id id;
213     istringstream iss(jobid.getReference());
214     iss >> id;
215
216     Parametre param;
217     Environnement env;
218
219     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
220     // @@@ --------> SECTION CRITIQUE <-------- @@@
221     pthread_mutex_lock(&_threads_mutex);
222     std::map<Id, Child >::iterator pos = _threads.find(id);
223     bool found = (pos != _threads.end());
224     if (found) {
225       param = pos->second.param;
226       env   = pos->second.env;
227     }
228     pthread_mutex_unlock(&_threads_mutex);
229     // @@@ --------> SECTION CRITIQUE <-------- @@@
230     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
231
232     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
233
234     JobInfo_Local ji(param, env);
235     return ji;
236   }
237
238
239
240   // Methode pour le controle des jobs : teste si un job est present en machine
241   bool BatchManager_Local::isRunning(const JobId & jobid)
242   {
243     Id id;
244     istringstream iss(jobid.getReference());
245     iss >> id;
246
247     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
248     // @@@ --------> SECTION CRITIQUE <-------- @@@
249     pthread_mutex_lock(&_threads_mutex);
250     bool running = (_threads[id].param[STATE].str() == RUNNING);
251     pthread_mutex_unlock(&_threads_mutex);
252     // @@@ --------> SECTION CRITIQUE <-------- @@@
253     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
254
255     return running;
256   }
257
258   // Methode de destruction d'un job
259   void BatchManager_Local::cancel(pthread_t thread_id)
260   {
261     pthread_mutex_lock(&_threads_mutex);
262     pthread_cancel(thread_id);
263     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
264     pthread_mutex_unlock(&_threads_mutex);
265   }
266
267
268   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
269   {
270     ostringstream exec_sub_cmd;
271
272 #ifdef WIN32
273     char drive[_MAX_DRIVE];
274     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
275     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
276 #endif
277
278     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
279
280     if (param.find(ARGUMENTS) != param.end()) {
281       Versatile V = param[ARGUMENTS];
282       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
283         StringType argt = * static_cast<StringType *>(*it);
284         string     arg  = argt;
285         exec_sub_cmd << " " << arg;
286       }
287     }
288
289     if (param.find(INFILE) != param.end()) {
290       Versatile V = param[INFILE];
291       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
292         Couple cpl = * static_cast<CoupleType*>(*it);
293         string remote = cpl.getRemote();
294         if (remote == "stdin")
295         exec_sub_cmd << " <stdin";
296       }
297     }
298
299     if (param.find(OUTFILE) != param.end()) {
300       Versatile V = param[OUTFILE];
301       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
302         Couple cpl = * static_cast<CoupleType*>(*it);
303         string remote = cpl.getRemote();
304         if (remote == "stdout") exec_sub_cmd << " 1>stdout";
305         if (remote == "stderr") exec_sub_cmd << " 2>stderr";
306       }
307     }
308
309     string user;
310     Parametre::const_iterator it = param.find(USER);
311     if (it != param.end()) {
312       user = string(it->second);
313     }
314
315     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
316   }
317
318
319
320   // Constructeur de la classe ThreadAdapter
321   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
322   _bm(bm), _job(job), _id(id)
323   {
324     // Nothing to do
325   }
326
327
328
329   // Methode d'execution du thread
330   void * BatchManager_Local::ThreadAdapter::run(void * arg)
331   {
332     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
333
334 #ifndef WIN32
335     // On bloque tous les signaux pour ce thread
336     sigset_t setmask;
337     sigfillset(&setmask);
338     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
339 #endif
340
341     // On autorise la terminaison differee du thread
342     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
343     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
344     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
345
346     // On enregistre la fonction de suppression du fils en cas d'arret du thread
347     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
348     // sera prise en compte par pthread_testcancel()
349     Process child;
350     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
351     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
352     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
353
354
355     // Le code retour cumule (ORed) de tous les appels
356     // Nul en cas de reussite de l'ensemble des operations
357     int rc = 0;
358
359     // Cette table contient la liste des fichiers a detruire a la fin du processus
360     std::vector<string> files_to_delete;
361
362
363
364     // On copie les fichiers d'entree pour le fils
365     const Parametre param   = p_ta->_job.getParametre();
366     Parametre::const_iterator it;
367
368     // On initialise la variable workdir a la valeur du Current Working Directory
369     char * cwd =
370 #ifdef WIN32
371       _getcwd(NULL, 0);
372 #else
373       new char [PATH_MAX];
374     getcwd(cwd, PATH_MAX);
375 #endif
376     string workdir = cwd;
377     delete [] cwd;
378
379     if ( (it = param.find(WORKDIR)) != param.end() ) {
380       workdir = static_cast<string>( (*it).second );
381     }
382
383     string executionhost = string(param[EXECUTIONHOST]);
384     string user;
385     if ( (it = param.find(USER)) != param.end() ) {
386       user = string(it->second);
387     }
388
389     if ( (it = param.find(INFILE)) != param.end() ) {
390       Versatile V = (*it).second;
391       Versatile::iterator Vit;
392
393       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
394         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
395         Couple cp       = cpt;
396         string local    = cp.getLocal();
397         string remote   = cp.getRemote();
398
399         std::cerr << workdir << std::endl;
400         std::cerr << remote << std::endl;
401
402         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
403                                                                     workdir + "/" + remote,
404                                                                     executionhost, user);
405         if (status) {
406           // Echec de la copie
407           rc |= 1;
408         } else {
409           // On enregistre le fichier comme etant a detruire
410           files_to_delete.push_back(workdir + "/" + remote);
411         }
412
413       }
414     }
415
416
417
418
419     // On forke/exec un nouveau process pour pouvoir controler le fils
420     // (plus finement qu'avec un appel system)
421     // int rc = system(commande.c_str());
422     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
423     //execv("/usr/bin/ssh", parmList);
424 #ifdef WIN32
425     child = p_ta->launchWin32ChildProcess();
426     p_ta->pere(child);
427 #else
428     child = fork();
429     if (child < 0) { // erreur
430       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
431
432     } else if (child > 0) { // pere
433       p_ta->pere(child);
434
435     } else { // fils
436       p_ta->fils();
437     }
438 #endif
439
440
441     // On copie les fichiers de sortie du fils
442     if ( (it = param.find(OUTFILE)) != param.end() ) {
443       Versatile V = (*it).second;
444       Versatile::iterator Vit;
445
446       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
447         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
448         Couple cp       = cpt;
449         string local    = cp.getLocal();
450         string remote   = cp.getRemote();
451
452         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
453                                                                     executionhost, user,
454                                                                     local, "", "");
455         if (status) {
456           // Echec de la copie
457           rc |= 1;
458         } else {
459           // On enregistre le fichier comme etant a detruire
460           files_to_delete.push_back(workdir + "/" + remote);
461         }
462
463       }
464     }
465
466     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
467     // ou si la creation du fils n'a pu avoir lieu
468     if ( (rc == 0) || (child < 0) ) {
469       std::vector<string>::const_iterator it;
470       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
471         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
472 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
473         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
474 #ifdef WIN32
475         remove_cmd = string("\"") + remove_cmd + string("\"");
476 #endif
477         system(remove_cmd.c_str());*/
478       }
479     }
480
481     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
482
483     // Set the job state to FINISHED or FAILED
484     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
485
486     // On retire la fonction de nettoyage de la memoire
487     pthread_cleanup_pop(0);
488
489     // On retire la fonction de suppression du fils
490     pthread_cleanup_pop(0);
491
492     // remove setFailedOnCancel function from cancel stack
493     pthread_cleanup_pop(0);
494
495     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
496
497     // On invoque la fonction de nettoyage de la memoire
498     delete_on_exit(arg);
499
500     UNDER_LOCK( cout << "Father is leaving" << endl );
501     pthread_exit(NULL);
502     return NULL;
503   }
504
505
506
507
508   void BatchManager_Local::ThreadAdapter::pere(Process child)
509   {
510     time_t child_starttime = time(NULL);
511
512     // On enregistre le fils dans la table des threads
513     pthread_t thread_id = pthread_self();
514
515     Parametre param   = _job.getParametre();
516     Environnement env = _job.getEnvironnement();
517
518     ostringstream id_sst;
519     id_sst << _id;
520     param[ID]         = id_sst.str();
521     param[STATE]      = Batch::RUNNING;
522 #ifndef WIN32
523     param[PID]        = child;
524 #endif
525
526     _bm._threads[_id].thread_id = thread_id;
527 #ifndef WIN32
528     _bm._threads[_id].pid       = child;
529 #endif
530     _bm._threads[_id].hasFailed = false;
531     _bm._threads[_id].param     = param;
532     _bm._threads[_id].env       = env;
533     _bm._threads[_id].command_queue.push(NOP);
534
535     // Unlock the master thread. From here, all shared variables must be protected
536     // from concurrent access
537     pthread_cond_signal(&_bm._threadSyncCondition);
538
539
540     // on boucle en attendant que le fils ait termine
541     while (1) {
542 #ifdef WIN32
543       DWORD exitCode;
544       BOOL res = GetExitCodeProcess(child, &exitCode);
545       if (exitCode != STILL_ACTIVE) {
546         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
547         break;
548       }
549 #else
550       int child_rc = 0;
551       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
552       if (child_wait_rc > 0) {
553          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
554          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
555          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
556          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
557          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
558          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
559          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
560 #ifdef WIFCONTINUED
561          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
562 #endif
563         if (WIFSTOPPED(child_rc)) {
564           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
565           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
566           // desactive car s'il est possible de detecter l'arret d'un process, il est
567           // plus difficile de detecter sa reprise.
568
569           // Le fils est simplement stoppe
570           // @@@ --------> SECTION CRITIQUE <-------- @@@
571           pthread_mutex_lock(&_bm._threads_mutex);
572           _bm._threads[_id].param[STATE] = Batch::PAUSED;
573           pthread_mutex_unlock(&_bm._threads_mutex);
574           // @@@ --------> SECTION CRITIQUE <-------- @@@
575           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
576
577         }
578         else {
579           // Le fils est termine, on sort de la boucle et du if englobant
580           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
581           break;
582         }
583       }
584       else if (child_wait_rc == -1) {
585         // Le fils a disparu ...
586         // @@@ --------> SECTION CRITIQUE <-------- @@@
587         pthread_mutex_lock(&_bm._threads_mutex);
588         _bm._threads[_id].hasFailed = true;
589         pthread_mutex_unlock(&_bm._threads_mutex);
590         // @@@ --------> SECTION CRITIQUE <-------- @@@
591         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
592         break;
593       }
594 #endif
595
596       // On teste si le thread doit etre detruit
597       pthread_testcancel();
598
599
600
601       // On regarde si le fils n'a pas depasse son temps (wallclock time)
602       time_t child_currenttime = time(NULL);
603       time_t child_elapsedtime = child_currenttime - child_starttime;
604       if (param.find(MAXWALLTIME) != param.end()) {
605         int maxwalltime = param[MAXWALLTIME];
606         //        cout << "child_starttime          = " << child_starttime        << endl
607         //             << "child_currenttime        = " << child_currenttime      << endl
608         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
609         //             << "maxwalltime              = " << maxwalltime            << endl
610         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
611         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
612           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
613           // On introduit une commande dans la queue du thread
614           // @@@ --------> SECTION CRITIQUE <-------- @@@
615           pthread_mutex_lock(&_bm._threads_mutex);
616           if (_bm._threads.find(_id) != _bm._threads.end())
617             _bm._threads[_id].command_queue.push(KILL);
618           pthread_mutex_unlock(&_bm._threads_mutex);
619           // @@@ --------> SECTION CRITIQUE <-------- @@@
620
621
622         } else if (child_elapsedtime > maxwalltime ) {
623           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
624           // On introduit une commande dans la queue du thread
625           // @@@ --------> SECTION CRITIQUE <-------- @@@
626           pthread_mutex_lock(&_bm._threads_mutex);
627           if (_bm._threads.find(_id) != _bm._threads.end())
628             _bm._threads[_id].command_queue.push(TERM);
629           pthread_mutex_unlock(&_bm._threads_mutex);
630           // @@@ --------> SECTION CRITIQUE <-------- @@@
631         }
632       }
633
634
635
636       // On regarde s'il y a quelque chose a faire dans la queue de commande
637       // @@@ --------> SECTION CRITIQUE <-------- @@@
638       pthread_mutex_lock(&_bm._threads_mutex);
639       if (_bm._threads.find(_id) != _bm._threads.end()) {
640         while (_bm._threads[_id].command_queue.size() > 0) {
641           Commande cmd = _bm._threads[_id].command_queue.front();
642           _bm._threads[_id].command_queue.pop();
643
644           switch (cmd) {
645     case NOP:
646       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
647       break;
648 #ifndef WIN32
649     case HOLD:
650       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
651       kill(child, SIGSTOP);
652       break;
653
654     case RELEASE:
655       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
656       kill(child, SIGCONT);
657       break;
658
659     case TERM:
660       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
661       kill(child, SIGTERM);
662       break;
663
664     case KILL:
665       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
666       kill(child, SIGKILL);
667       break;
668 #endif
669     case ALTER:
670       break;
671
672     default:
673       break;
674           }
675         }
676
677       }
678       pthread_mutex_unlock(&_bm._threads_mutex);
679       // @@@ --------> SECTION CRITIQUE <-------- @@@
680
681       // On fait une petite pause pour ne pas surcharger inutilement le processeur
682 #ifdef WIN32
683       Sleep(1000);
684 #else
685       sleep(1);
686 #endif
687
688     }
689
690   }
691
692
693
694 #ifndef WIN32
695
696   void BatchManager_Local::ThreadAdapter::fils()
697   {
698     Parametre param = _job.getParametre();
699     Parametre::iterator it;
700
701       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
702       //int result = execv("/usr/bin/ssh", parmList);
703       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
704       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
705     try {
706
707       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
708       vector<string> command;
709       if (param.find(EXECUTABLE) != param.end()) {
710         command = _bm.exec_command(param);
711       } else exit(1);
712
713       // Build the argument array argv from the command
714       char ** argv = new char * [command.size() + 1];
715       string comstr;
716       for (string::size_type i=0 ; i<command.size() ; i++) {
717         argv[i] = new char[command[i].size() + 1];
718         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
719         if (i>0) comstr += " # ";
720         comstr += command[i];
721       }
722
723       argv[command.size()] = NULL;
724
725       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
726       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
727
728       // Create the environment for the new process. Note (RB): Here we change the environment for
729       // the process launched in local. It would seem more logical to set the environment for the
730       // remote process.
731       Environnement env = _job.getEnvironnement();
732
733       char ** envp = NULL;
734       if(env.size() > 0) {
735         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
736         int i = 0;
737         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
738           const string  & key   = (*it).first;
739           const string  & value = (*it).second;
740           ostringstream oss;
741           oss << key << "=" << value;
742           envp[i]         = new char [oss.str().size() + 1];
743           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
744         }
745
746         // assert (i == env.size())
747         envp[i] = NULL;
748       }
749
750       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
751       //int result = execv("/usr/bin/ssh", parmList);
752       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
753       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
754
755
756
757       // On positionne les limites systeme imposees au fils
758       if (param.find(MAXCPUTIME) != param.end()) {
759         int maxcputime = param[MAXCPUTIME];
760         struct rlimit limit;
761         limit.rlim_cur = maxcputime;
762         limit.rlim_max = int(maxcputime * 1.1);
763         setrlimit(RLIMIT_CPU, &limit);
764       }
765
766       if (param.find(MAXDISKSIZE) != param.end()) {
767         int maxdisksize = param[MAXDISKSIZE];
768         struct rlimit limit;
769         limit.rlim_cur = maxdisksize * 1024;
770         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
771         setrlimit(RLIMIT_FSIZE, &limit);
772       }
773
774       if (param.find(MAXRAMSIZE) != param.end()) {
775         int maxramsize = param[MAXRAMSIZE];
776         struct rlimit limit;
777         limit.rlim_cur = maxramsize * 1024;
778         limit.rlim_max = int(maxramsize * 1.1) * 1024;
779         setrlimit(RLIMIT_AS, &limit);
780       }
781
782
783       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
784       //int result = execv("/usr/bin/ssh", parmList);
785       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
786       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
787
788       // On cree une session pour le fils de facon a ce qu'il ne soit pas
789       // detruit lorsque le shell se termine (le shell ouvre une session et
790       // tue tous les process appartenant a la session en quittant)
791       setsid();
792
793
794       // On ferme les descripteurs de fichiers standards
795       //close(STDIN_FILENO);
796       //close(STDOUT_FILENO);
797       //close(STDERR_FILENO);
798
799
800       // On execute la commande du fils
801       int result = execve(argv[0], argv, envp);
802       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
803       // No need to deallocate since nothing happens after a successful exec
804
805       // Normalement on ne devrait jamais arriver ici
806       ofstream file_err("error.log");
807       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
808
809     } catch (GenericException & e) {
810
811       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
812     }
813
814     exit(99);
815   }
816
817 #else
818
819   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
820   {
821     Parametre param = _job.getParametre();
822     Parametre::iterator it;
823     PROCESS_INFORMATION pi;
824
825     try {
826
827       // EXECUTABLE is MANDATORY, if missing, we throw an exception
828       vector<string> exec_command;
829       if (param.find(EXECUTABLE) != param.end()) {
830         exec_command = _bm.exec_command(param);
831       } else {
832         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
833       }
834
835       // Build the command string from the command argument vector
836       string comstr;
837       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
838         if (i>0) comstr += " ";
839         comstr += exec_command[i];
840       }
841
842       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
843
844       // Create the environment for the new process. Note (RB): Here we change the environment for
845       // the process launched in local. It would seem more logical to set the environment for the
846       // remote process.
847       // Note that if no environment is specified, we reuse the current environment.
848       Environnement env = _job.getEnvironnement();
849       char * chNewEnv = NULL;
850
851       if(env.size() > 0) {
852         chNewEnv = new char[4096];
853         LPTSTR lpszCurrentVariable = chNewEnv;
854         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
855           const string  & key   = (*it).first;
856           const string  & value = (*it).second;
857           string envvar = key + "=" + value;
858           envvar.copy(lpszCurrentVariable, envvar.size());
859           lpszCurrentVariable[envvar.size()] = '\0';
860           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
861         }
862         // Terminate the block with a NULL byte.
863         *lpszCurrentVariable = '\0';
864       }
865
866
867       STARTUPINFO si;
868       ZeroMemory( &si, sizeof(si) );
869       si.cb = sizeof(si);
870       ZeroMemory( &pi, sizeof(pi) );
871
872       // Copy the command to a non-const buffer
873       char * buffer = strdup(comstr.c_str());
874
875       // launch the new process
876       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
877                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
878
879       if (buffer) free(buffer);
880       if (!res) throw RunTimeException("Error while creating new process");
881
882       CloseHandle(pi.hThread);
883
884     } catch (GenericException & e) {
885
886       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
887     }
888
889     return pi.hProcess;
890   }
891
892 #endif
893
894
895   void BatchManager_Local::kill_child_on_exit(void * p_pid)
896   {
897 #ifndef WIN32
898     //TODO: porting of following functionality
899     pid_t child = * static_cast<pid_t *>(p_pid);
900
901     // On tue le fils
902     kill(child, SIGTERM);
903
904     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
905     // mais cette option n'est pas implementee pour le moment, car il est
906     // preferable de laisser le process fils se terminer normalement et seul.
907 #endif
908   }
909
910   void BatchManager_Local::delete_on_exit(void * arg)
911   {
912     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
913     delete p_ta;
914   }
915
916   void BatchManager_Local::setFailedOnCancel(void * arg)
917   {
918     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
919     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
920     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
921     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
922
923     // Unlock the master thread. From here, the batch manager instance should not be used.
924     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
925   }
926
927 }