Salome HOME
7e0c0b040116c6153d467456bcf5b9ae5ae01a0b
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2010  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53 #include "Batch_IOMutex.hxx"
54 #include "Batch_BatchManager_Local.hxx"
55 #include "Batch_RunTimeException.hxx"
56
57 using namespace std;
58
59 namespace Batch {
60
61
62   // Constructeur
63   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64                                          CommunicationProtocolType protocolType)
65     : BatchManager(parent, host), _connect(0),
66       _protocol(CommunicationProtocol::getInstance(protocolType)),
67       _idCounter(0)
68   {
69     pthread_mutex_init(&_threads_mutex, NULL);
70     pthread_cond_init(&_threadSyncCondition, NULL);
71   }
72
73   // Destructeur
74   BatchManager_Local::~BatchManager_Local()
75   {
76     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
77       pthread_mutex_lock(&_threads_mutex);
78       string state = iter->second.param[STATE];
79       if (state != FINISHED && state != FAILED) {
80         UNDER_LOCK( cout << "Warning: Job " << iter->first <<
81                             " is not finished, it will now be canceled." << endl );
82         pthread_cancel(iter->second.thread_id);
83         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
84       }
85       pthread_mutex_unlock(&_threads_mutex);
86     }
87     pthread_mutex_destroy(&_threads_mutex);
88     pthread_cond_destroy(&_threadSyncCondition);
89   }
90
91   const CommunicationProtocol & BatchManager_Local::getProtocol() const
92   {
93     return _protocol;
94   }
95
96   // Methode pour le controle des jobs : soumet un job au gestionnaire
97   const JobId BatchManager_Local::submitJob(const Job & job)
98   {
99     Job_Local jobLocal = job;
100     Id id = _idCounter++;
101     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
102
103     // Les attributs du thread a sa creation
104     pthread_attr_t thread_attr;
105     pthread_attr_init(&thread_attr);
106     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
107
108     // Creation du thread qui va executer la commande systeme qu'on lui passe
109     pthread_t thread_id;
110     pthread_mutex_lock(&_threads_mutex);
111     int rc = pthread_create(&thread_id,
112       &thread_attr,
113       &ThreadAdapter::run,
114       static_cast<void *>(p_ta));
115
116     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
117     pthread_attr_destroy(&thread_attr);
118
119     if (rc != 0) {
120       pthread_mutex_unlock(&_threads_mutex);
121       throw RunTimeException("Can't create new thread in BatchManager_Local");
122     }
123
124     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
125     pthread_mutex_unlock(&_threads_mutex);
126
127     ostringstream id_sst;
128     id_sst << id;
129     return JobId(this, id_sst.str());
130   }
131
132   // Methode pour le controle des jobs : retire un job du gestionnaire
133   void BatchManager_Local::deleteJob(const JobId & jobid)
134   {
135     Id id;
136
137     istringstream iss(jobid.getReference());
138     iss >> id;
139
140     // On retrouve le thread_id du thread
141     pthread_t thread_id;
142
143     // @@@ --------> SECTION CRITIQUE <-------- @@@
144     pthread_mutex_lock(&_threads_mutex);
145     bool idFound = (_threads.find(id) != _threads.end());
146     if (idFound) {
147       string state = _threads[id].param[STATE];
148       if (state != FINISHED && state != FAILED) {
149         pthread_cancel(_threads[id].thread_id);
150         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
151       } else {
152         cout << "Cannot delete job " << jobid.getReference() <<
153                 ". Job is already finished." << endl;
154       }
155     }
156     pthread_mutex_unlock(&_threads_mutex);
157     // @@@ --------> SECTION CRITIQUE <-------- @@@
158
159     if (!idFound)
160       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
161   }
162
163   // Methode pour le controle des jobs : suspend un job en file d'attente
164   void BatchManager_Local::holdJob(const JobId & jobid)
165   {
166     Id id;
167     istringstream iss(jobid.getReference());
168     iss >> id;
169
170     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
171
172     // On introduit une commande dans la queue du thread
173     // @@@ --------> SECTION CRITIQUE <-------- @@@
174     pthread_mutex_lock(&_threads_mutex);
175     if (_threads.find(id) != _threads.end())
176       _threads[id].command_queue.push(HOLD);
177     pthread_mutex_unlock(&_threads_mutex);
178     // @@@ --------> SECTION CRITIQUE <-------- @@@
179   }
180
181   // Methode pour le controle des jobs : relache un job suspendu
182   void BatchManager_Local::releaseJob(const JobId & jobid)
183   {
184     Id id;
185     istringstream iss(jobid.getReference());
186     iss >> id;
187
188     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
189
190     // On introduit une commande dans la queue du thread
191     // @@@ --------> SECTION CRITIQUE <-------- @@@
192     pthread_mutex_lock(&_threads_mutex);
193     if (_threads.find(id) != _threads.end())
194       _threads[id].command_queue.push(RELEASE);
195     pthread_mutex_unlock(&_threads_mutex);
196     // @@@ --------> SECTION CRITIQUE <-------- @@@
197   }
198
199
200   // Methode pour le controle des jobs : modifie un job en file d'attente
201   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
202   {
203   }
204
205   // Methode pour le controle des jobs : modifie un job en file d'attente
206   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
207   {
208     alterJob(jobid, param, Environnement());
209   }
210
211   // Methode pour le controle des jobs : modifie un job en file d'attente
212   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
213   {
214     alterJob(jobid, Parametre(), env);
215   }
216
217
218
219   // Methode pour le controle des jobs : renvoie l'etat du job
220   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
221   {
222     Id id;
223     istringstream iss(jobid.getReference());
224     iss >> id;
225
226     Parametre param;
227     Environnement env;
228
229     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
230     // @@@ --------> SECTION CRITIQUE <-------- @@@
231     pthread_mutex_lock(&_threads_mutex);
232     std::map<Id, Child >::iterator pos = _threads.find(id);
233     bool found = (pos != _threads.end());
234     if (found) {
235       param = pos->second.param;
236       env   = pos->second.env;
237     }
238     pthread_mutex_unlock(&_threads_mutex);
239     // @@@ --------> SECTION CRITIQUE <-------- @@@
240     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
241
242     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
243
244     JobInfo_Local ji(param, env);
245     return ji;
246   }
247
248
249   // Ce manager ne peut pas reprendre un job
250   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
251   // au JobId
252   const Batch::JobId
253   BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
254   {
255     return JobId(this, "undefined");
256   }
257
258   // Methode pour le controle des jobs : teste si un job est present en machine
259   bool BatchManager_Local::isRunning(const JobId & jobid)
260   {
261     Id id;
262     istringstream iss(jobid.getReference());
263     iss >> id;
264
265     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
266     // @@@ --------> SECTION CRITIQUE <-------- @@@
267     pthread_mutex_lock(&_threads_mutex);
268     bool running = (_threads[id].param[STATE].str() == RUNNING);
269     pthread_mutex_unlock(&_threads_mutex);
270     // @@@ --------> SECTION CRITIQUE <-------- @@@
271     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
272
273     return running;
274   }
275
276
277   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
278   {
279     ostringstream exec_sub_cmd;
280
281 #ifdef WIN32
282     char drive[_MAX_DRIVE];
283     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
284     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
285 #endif
286
287     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
288
289     if (param.find(ARGUMENTS) != param.end()) {
290       Versatile V = param[ARGUMENTS];
291       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
292         StringType argt = * static_cast<StringType *>(*it);
293         string     arg  = argt;
294         exec_sub_cmd << " " << arg;
295       }
296     }
297
298     if (param.find(INFILE) != param.end()) {
299       Versatile V = param[INFILE];
300       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
301         Couple cpl = * static_cast<CoupleType*>(*it);
302         string remote = cpl.getRemote();
303         if (remote == "stdin")
304         exec_sub_cmd << " <stdin";
305       }
306     }
307
308     if (param.find(OUTFILE) != param.end()) {
309       Versatile V = param[OUTFILE];
310       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
311         Couple cpl = * static_cast<CoupleType*>(*it);
312         string remote = cpl.getRemote();
313         if (remote == "stdout") exec_sub_cmd << " 1>stdout";
314         if (remote == "stderr") exec_sub_cmd << " 2>stderr";
315       }
316     }
317
318     string user;
319     Parametre::const_iterator it = param.find(USER);
320     if (it != param.end()) {
321       user = string(it->second);
322     }
323
324     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
325   }
326
327
328
329   // Constructeur de la classe ThreadAdapter
330   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
331   _bm(bm), _job(job), _id(id)
332   {
333     // Nothing to do
334   }
335
336
337
338   // Methode d'execution du thread
339   void * BatchManager_Local::ThreadAdapter::run(void * arg)
340   {
341     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
342
343 #ifndef WIN32
344     // On bloque tous les signaux pour ce thread
345     sigset_t setmask;
346     sigfillset(&setmask);
347     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
348 #endif
349
350     // On autorise la terminaison differee du thread
351     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
352     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
353     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
354
355     // On enregistre la fonction de suppression du fils en cas d'arret du thread
356     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
357     // sera prise en compte par pthread_testcancel()
358     Process child;
359     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
360     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
361     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
362
363
364     // Le code retour cumule (ORed) de tous les appels
365     // Nul en cas de reussite de l'ensemble des operations
366     int rc = 0;
367
368     // Cette table contient la liste des fichiers a detruire a la fin du processus
369     std::vector<string> files_to_delete;
370
371
372
373     // On copie les fichiers d'entree pour le fils
374     const Parametre param   = p_ta->_job.getParametre();
375     Parametre::const_iterator it;
376
377     // On initialise la variable workdir a la valeur du Current Working Directory
378     char * cwd =
379 #ifdef WIN32
380       _getcwd(NULL, 0);
381 #else
382       new char [PATH_MAX];
383     getcwd(cwd, PATH_MAX);
384 #endif
385     string workdir = cwd;
386     delete [] cwd;
387
388     if ( (it = param.find(WORKDIR)) != param.end() ) {
389       workdir = static_cast<string>( (*it).second );
390     }
391
392     string executionhost = string(param[EXECUTIONHOST]);
393     string user;
394     if ( (it = param.find(USER)) != param.end() ) {
395       user = string(it->second);
396     }
397
398     if ( (it = param.find(INFILE)) != param.end() ) {
399       Versatile V = (*it).second;
400       Versatile::iterator Vit;
401
402       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
403         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
404         Couple cp       = cpt;
405         string local    = cp.getLocal();
406         string remote   = cp.getRemote();
407
408         std::cerr << workdir << std::endl;
409         std::cerr << remote << std::endl;
410
411         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
412                                                                     workdir + "/" + remote,
413                                                                     executionhost, user);
414         if (status) {
415           // Echec de la copie
416           rc |= 1;
417         } else {
418           // On enregistre le fichier comme etant a detruire
419           files_to_delete.push_back(workdir + "/" + remote);
420         }
421
422       }
423     }
424
425
426
427
428     // On forke/exec un nouveau process pour pouvoir controler le fils
429     // (plus finement qu'avec un appel system)
430     // int rc = system(commande.c_str());
431     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
432     //execv("/usr/bin/ssh", parmList);
433 #ifdef WIN32
434     child = p_ta->launchWin32ChildProcess();
435     p_ta->pere(child);
436 #else
437     child = fork();
438     if (child < 0) { // erreur
439       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
440
441     } else if (child > 0) { // pere
442       p_ta->pere(child);
443
444     } else { // fils
445       p_ta->fils();
446     }
447 #endif
448
449
450     // On copie les fichiers de sortie du fils
451     if ( (it = param.find(OUTFILE)) != param.end() ) {
452       Versatile V = (*it).second;
453       Versatile::iterator Vit;
454
455       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
456         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
457         Couple cp       = cpt;
458         string local    = cp.getLocal();
459         string remote   = cp.getRemote();
460
461         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
462                                                                     executionhost, user,
463                                                                     local, "", "");
464         if (status) {
465           // Echec de la copie
466           rc |= 1;
467         } else {
468           // On enregistre le fichier comme etant a detruire
469           files_to_delete.push_back(workdir + "/" + remote);
470         }
471
472       }
473     }
474
475     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
476     // ou si la creation du fils n'a pu avoir lieu
477     if ( (rc == 0) || (child < 0) ) {
478       std::vector<string>::const_iterator it;
479       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
480         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
481 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
482         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
483 #ifdef WIN32
484         remove_cmd = string("\"") + remove_cmd + string("\"");
485 #endif
486         system(remove_cmd.c_str());*/
487       }
488     }
489
490     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
491
492     // Set the job state to FINISHED or FAILED
493     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
494
495     // On retire la fonction de nettoyage de la memoire
496     pthread_cleanup_pop(0);
497
498     // On retire la fonction de suppression du fils
499     pthread_cleanup_pop(0);
500
501     // remove setFailedOnCancel function from cancel stack
502     pthread_cleanup_pop(0);
503
504     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
505
506     // On invoque la fonction de nettoyage de la memoire
507     delete_on_exit(arg);
508
509     UNDER_LOCK( cout << "Father is leaving" << endl );
510     pthread_exit(NULL);
511     return NULL;
512   }
513
514
515
516
517   void BatchManager_Local::ThreadAdapter::pere(Process child)
518   {
519     time_t child_starttime = time(NULL);
520
521     // On enregistre le fils dans la table des threads
522     pthread_t thread_id = pthread_self();
523
524     Parametre param   = _job.getParametre();
525     Environnement env = _job.getEnvironnement();
526
527     ostringstream id_sst;
528     id_sst << _id;
529     param[ID]         = id_sst.str();
530     param[STATE]      = Batch::RUNNING;
531 #ifndef WIN32
532     param[PID]        = child;
533 #endif
534
535     _bm._threads[_id].thread_id = thread_id;
536 #ifndef WIN32
537     _bm._threads[_id].pid       = child;
538 #endif
539     _bm._threads[_id].hasFailed = false;
540     _bm._threads[_id].param     = param;
541     _bm._threads[_id].env       = env;
542     _bm._threads[_id].command_queue.push(NOP);
543
544     // Unlock the master thread. From here, all shared variables must be protected
545     // from concurrent access
546     pthread_cond_signal(&_bm._threadSyncCondition);
547
548
549     // on boucle en attendant que le fils ait termine
550     while (1) {
551 #ifdef WIN32
552       DWORD exitCode;
553       BOOL res = GetExitCodeProcess(child, &exitCode);
554       if (exitCode != STILL_ACTIVE) {
555         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
556         break;
557       }
558 #else
559       int child_rc = 0;
560       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
561       if (child_wait_rc > 0) {
562          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
563          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
564          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
565          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
566          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
567          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
568          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
569 #ifdef WIFCONTINUED
570          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
571 #endif
572         if (WIFSTOPPED(child_rc)) {
573           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
574           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
575           // desactive car s'il est possible de detecter l'arret d'un process, il est
576           // plus difficile de detecter sa reprise.
577
578           // Le fils est simplement stoppe
579           // @@@ --------> SECTION CRITIQUE <-------- @@@
580           pthread_mutex_lock(&_bm._threads_mutex);
581           _bm._threads[_id].param[STATE] = Batch::PAUSED;
582           pthread_mutex_unlock(&_bm._threads_mutex);
583           // @@@ --------> SECTION CRITIQUE <-------- @@@
584           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
585
586         }
587         else {
588           // Le fils est termine, on sort de la boucle et du if englobant
589           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
590           break;
591         }
592       }
593       else if (child_wait_rc == -1) {
594         // Le fils a disparu ...
595         // @@@ --------> SECTION CRITIQUE <-------- @@@
596         pthread_mutex_lock(&_bm._threads_mutex);
597         _bm._threads[_id].hasFailed = true;
598         pthread_mutex_unlock(&_bm._threads_mutex);
599         // @@@ --------> SECTION CRITIQUE <-------- @@@
600         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
601         break;
602       }
603 #endif
604
605       // On teste si le thread doit etre detruit
606       pthread_testcancel();
607
608
609
610       // On regarde si le fils n'a pas depasse son temps (wallclock time)
611       time_t child_currenttime = time(NULL);
612       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
613       if (param.find(MAXWALLTIME) != param.end()) {
614         long maxwalltime = param[MAXWALLTIME];
615         //        cout << "child_starttime          = " << child_starttime        << endl
616         //             << "child_currenttime        = " << child_currenttime      << endl
617         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
618         //             << "maxwalltime              = " << maxwalltime            << endl
619         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
620         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
621           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
622           // On introduit une commande dans la queue du thread
623           // @@@ --------> SECTION CRITIQUE <-------- @@@
624           pthread_mutex_lock(&_bm._threads_mutex);
625           if (_bm._threads.find(_id) != _bm._threads.end())
626             _bm._threads[_id].command_queue.push(KILL);
627           pthread_mutex_unlock(&_bm._threads_mutex);
628           // @@@ --------> SECTION CRITIQUE <-------- @@@
629
630
631         } else if (child_elapsedtime_minutes > maxwalltime ) {
632           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
633           // On introduit une commande dans la queue du thread
634           // @@@ --------> SECTION CRITIQUE <-------- @@@
635           pthread_mutex_lock(&_bm._threads_mutex);
636           if (_bm._threads.find(_id) != _bm._threads.end())
637             _bm._threads[_id].command_queue.push(TERM);
638           pthread_mutex_unlock(&_bm._threads_mutex);
639           // @@@ --------> SECTION CRITIQUE <-------- @@@
640         }
641       }
642
643
644
645       // On regarde s'il y a quelque chose a faire dans la queue de commande
646       // @@@ --------> SECTION CRITIQUE <-------- @@@
647       pthread_mutex_lock(&_bm._threads_mutex);
648       if (_bm._threads.find(_id) != _bm._threads.end()) {
649         while (_bm._threads[_id].command_queue.size() > 0) {
650           Commande cmd = _bm._threads[_id].command_queue.front();
651           _bm._threads[_id].command_queue.pop();
652
653           switch (cmd) {
654     case NOP:
655       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
656       break;
657 #ifndef WIN32
658     case HOLD:
659       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
660       kill(child, SIGSTOP);
661       break;
662
663     case RELEASE:
664       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
665       kill(child, SIGCONT);
666       break;
667
668     case TERM:
669       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
670       kill(child, SIGTERM);
671       break;
672
673     case KILL:
674       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
675       kill(child, SIGKILL);
676       break;
677 #endif
678     case ALTER:
679       break;
680
681     default:
682       break;
683           }
684         }
685
686       }
687       pthread_mutex_unlock(&_bm._threads_mutex);
688       // @@@ --------> SECTION CRITIQUE <-------- @@@
689
690       // On fait une petite pause pour ne pas surcharger inutilement le processeur
691 #ifdef WIN32
692       Sleep(1000);
693 #else
694       sleep(1);
695 #endif
696
697     }
698
699   }
700
701
702
703 #ifndef WIN32
704
705   void BatchManager_Local::ThreadAdapter::fils()
706   {
707     Parametre param = _job.getParametre();
708     Parametre::iterator it;
709
710       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
711       //int result = execv("/usr/bin/ssh", parmList);
712       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
713       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
714     try {
715
716       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
717       vector<string> command;
718       if (param.find(EXECUTABLE) != param.end()) {
719         command = _bm.exec_command(param);
720       } else exit(1);
721
722       // Build the argument array argv from the command
723       char ** argv = new char * [command.size() + 1];
724       string comstr;
725       for (string::size_type i=0 ; i<command.size() ; i++) {
726         argv[i] = new char[command[i].size() + 1];
727         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
728         if (i>0) comstr += " # ";
729         comstr += command[i];
730       }
731
732       argv[command.size()] = NULL;
733
734       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
735       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
736
737       // Create the environment for the new process. Note (RB): Here we change the environment for
738       // the process launched in local. It would seem more logical to set the environment for the
739       // remote process.
740       Environnement env = _job.getEnvironnement();
741
742       char ** envp = NULL;
743       if(env.size() > 0) {
744         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
745         int i = 0;
746         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
747           const string  & key   = (*it).first;
748           const string  & value = (*it).second;
749           ostringstream oss;
750           oss << key << "=" << value;
751           envp[i]         = new char [oss.str().size() + 1];
752           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
753         }
754
755         // assert (i == env.size())
756         envp[i] = NULL;
757       }
758
759       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
760       //int result = execv("/usr/bin/ssh", parmList);
761       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
762       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
763
764
765
766       // On positionne les limites systeme imposees au fils
767       if (param.find(MAXCPUTIME) != param.end()) {
768         int maxcputime = param[MAXCPUTIME];
769         struct rlimit limit;
770         limit.rlim_cur = maxcputime;
771         limit.rlim_max = int(maxcputime * 1.1);
772         setrlimit(RLIMIT_CPU, &limit);
773       }
774
775       if (param.find(MAXDISKSIZE) != param.end()) {
776         int maxdisksize = param[MAXDISKSIZE];
777         struct rlimit limit;
778         limit.rlim_cur = maxdisksize * 1024;
779         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
780         setrlimit(RLIMIT_FSIZE, &limit);
781       }
782
783       if (param.find(MAXRAMSIZE) != param.end()) {
784         int maxramsize = param[MAXRAMSIZE];
785         struct rlimit limit;
786         limit.rlim_cur = maxramsize * 1024 * 1024;
787         limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
788         setrlimit(RLIMIT_AS, &limit);
789       }
790
791
792       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
793       //int result = execv("/usr/bin/ssh", parmList);
794       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
795       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
796
797       // On cree une session pour le fils de facon a ce qu'il ne soit pas
798       // detruit lorsque le shell se termine (le shell ouvre une session et
799       // tue tous les process appartenant a la session en quittant)
800       setsid();
801
802
803       // On ferme les descripteurs de fichiers standards
804       //close(STDIN_FILENO);
805       //close(STDOUT_FILENO);
806       //close(STDERR_FILENO);
807
808
809       // On execute la commande du fils
810       int result = execve(argv[0], argv, envp);
811       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
812       // No need to deallocate since nothing happens after a successful exec
813
814       // Normalement on ne devrait jamais arriver ici
815       ofstream file_err("error.log");
816       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
817
818     } catch (GenericException & e) {
819
820       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
821     }
822
823     exit(99);
824   }
825
826 #else
827
828   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
829   {
830     Parametre param = _job.getParametre();
831     Parametre::iterator it;
832     PROCESS_INFORMATION pi;
833
834     try {
835
836       // EXECUTABLE is MANDATORY, if missing, we throw an exception
837       vector<string> exec_command;
838       if (param.find(EXECUTABLE) != param.end()) {
839         exec_command = _bm.exec_command(param);
840       } else {
841         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
842       }
843
844       // Build the command string from the command argument vector
845       string comstr;
846       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
847         if (i>0) comstr += " ";
848         comstr += exec_command[i];
849       }
850
851       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
852
853       // Create the environment for the new process. Note (RB): Here we change the environment for
854       // the process launched in local. It would seem more logical to set the environment for the
855       // remote process.
856       // Note that if no environment is specified, we reuse the current environment.
857       Environnement env = _job.getEnvironnement();
858       char * chNewEnv = NULL;
859
860       if(env.size() > 0) {
861         chNewEnv = new char[4096];
862         LPTSTR lpszCurrentVariable = chNewEnv;
863         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
864           const string  & key   = (*it).first;
865           const string  & value = (*it).second;
866           string envvar = key + "=" + value;
867           envvar.copy(lpszCurrentVariable, envvar.size());
868           lpszCurrentVariable[envvar.size()] = '\0';
869           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
870         }
871         // Terminate the block with a NULL byte.
872         *lpszCurrentVariable = '\0';
873       }
874
875
876       STARTUPINFO si;
877       ZeroMemory( &si, sizeof(si) );
878       si.cb = sizeof(si);
879       ZeroMemory( &pi, sizeof(pi) );
880
881       // Copy the command to a non-const buffer
882       char * buffer = strdup(comstr.c_str());
883
884       // launch the new process
885       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
886                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
887
888       if (buffer) free(buffer);
889       if (!res) throw RunTimeException("Error while creating new process");
890
891       CloseHandle(pi.hThread);
892
893     } catch (GenericException & e) {
894
895       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
896     }
897
898     return pi.hProcess;
899   }
900
901 #endif
902
903
904   void BatchManager_Local::kill_child_on_exit(void * p_pid)
905   {
906 #ifndef WIN32
907     //TODO: porting of following functionality
908     pid_t child = * static_cast<pid_t *>(p_pid);
909
910     // On tue le fils
911     kill(child, SIGTERM);
912
913     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
914     // mais cette option n'est pas implementee pour le moment, car il est
915     // preferable de laisser le process fils se terminer normalement et seul.
916 #endif
917   }
918
919   void BatchManager_Local::delete_on_exit(void * arg)
920   {
921     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
922     delete p_ta;
923   }
924
925   void BatchManager_Local::setFailedOnCancel(void * arg)
926   {
927     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
928     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
929     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
930     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
931
932     // Unlock the master thread. From here, the batch manager instance should not be used.
933     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
934   }
935
936 }