Salome HOME
e47a42a609ab8def2c87c053ce4d8fec6ae7dbbe
[tools/libbatch.git] / src / Local / BatchManager_Local.cxx
1 //  Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include "Constants.hxx"
55 #include "IOMutex.hxx"
56 #include "BatchManager_Local.hxx"
57 #include "RunTimeException.hxx"
58 #include "Utils.hxx"
59
60 using namespace std;
61
62 namespace Batch {
63
64
65   // Constructeur
66   BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
67                                          const char * username,
68                                          CommunicationProtocolType protocolType, const char * mpiImpl)
69     : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
70       _idCounter(0)
71   {
72     pthread_mutex_init(&_threads_mutex, NULL);
73     pthread_cond_init(&_threadSyncCondition, NULL);
74   }
75
76   // Destructeur
77   BatchManager_Local::~BatchManager_Local()
78   {
79     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
80       pthread_mutex_lock(&_threads_mutex);
81       string state = iter->second.param[STATE];
82       if (state != FINISHED && state != FAILED) {
83         UNDER_LOCK( cout << "Warning: Job " << iter->first <<
84                             " is not finished, it will now be canceled." << endl );
85         pthread_cancel(iter->second.thread_id);
86         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
87       }
88       pthread_mutex_unlock(&_threads_mutex);
89     }
90     pthread_mutex_destroy(&_threads_mutex);
91     pthread_cond_destroy(&_threadSyncCondition);
92   }
93
94   // Methode pour le controle des jobs : soumet un job au gestionnaire
95   const JobId BatchManager_Local::submitJob(const Job & job)
96   {
97     // export input files in the working directory of the execution host
98     exportInputFiles(job);
99
100     Job_Local jobLocal = job;
101     Id id = _idCounter++;
102     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
103
104     // Les attributs du thread a sa creation
105     pthread_attr_t thread_attr;
106     pthread_attr_init(&thread_attr);
107     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
108
109     // Creation du thread qui va executer la commande systeme qu'on lui passe
110     pthread_t thread_id;
111     pthread_mutex_lock(&_threads_mutex);
112     int rc = pthread_create(&thread_id,
113       &thread_attr,
114       &ThreadAdapter::run,
115       static_cast<void *>(p_ta));
116
117     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
118     pthread_attr_destroy(&thread_attr);
119
120     if (rc != 0) {
121       pthread_mutex_unlock(&_threads_mutex);
122       throw RunTimeException("Can't create new thread in BatchManager_Local");
123     }
124
125     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
126     pthread_mutex_unlock(&_threads_mutex);
127
128     ostringstream id_sst;
129     id_sst << id;
130     return JobId(this, id_sst.str());
131   }
132
133   // Methode pour le controle des jobs : retire un job du gestionnaire
134   void BatchManager_Local::deleteJob(const JobId & jobid)
135   {
136     Id id;
137
138     istringstream iss(jobid.getReference());
139     iss >> id;
140
141     // @@@ --------> SECTION CRITIQUE <-------- @@@
142     pthread_mutex_lock(&_threads_mutex);
143     bool idFound = (_threads.find(id) != _threads.end());
144     if (idFound) {
145       string state = _threads[id].param[STATE];
146       if (state != FINISHED && state != FAILED) {
147         pthread_cancel(_threads[id].thread_id);
148         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
149       } else {
150         cout << "Cannot delete job " << jobid.getReference() <<
151                 ". Job is already finished." << endl;
152       }
153     }
154     pthread_mutex_unlock(&_threads_mutex);
155     // @@@ --------> SECTION CRITIQUE <-------- @@@
156
157     if (!idFound)
158       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
159   }
160
161   // Methode pour le controle des jobs : suspend un job en file d'attente
162   void BatchManager_Local::holdJob(const JobId & jobid)
163   {
164     Id id;
165     istringstream iss(jobid.getReference());
166     iss >> id;
167
168     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
169
170     // On introduit une commande dans la queue du thread
171     // @@@ --------> SECTION CRITIQUE <-------- @@@
172     pthread_mutex_lock(&_threads_mutex);
173     if (_threads.find(id) != _threads.end())
174       _threads[id].command_queue.push(HOLD);
175     pthread_mutex_unlock(&_threads_mutex);
176     // @@@ --------> SECTION CRITIQUE <-------- @@@
177   }
178
179   // Methode pour le controle des jobs : relache un job suspendu
180   void BatchManager_Local::releaseJob(const JobId & jobid)
181   {
182     Id id;
183     istringstream iss(jobid.getReference());
184     iss >> id;
185
186     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
187
188     // On introduit une commande dans la queue du thread
189     // @@@ --------> SECTION CRITIQUE <-------- @@@
190     pthread_mutex_lock(&_threads_mutex);
191     if (_threads.find(id) != _threads.end())
192       _threads[id].command_queue.push(RELEASE);
193     pthread_mutex_unlock(&_threads_mutex);
194     // @@@ --------> SECTION CRITIQUE <-------- @@@
195   }
196
197   // Methode pour le controle des jobs : renvoie l'etat du job
198   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
199   {
200     Id id;
201     istringstream iss(jobid.getReference());
202     iss >> id;
203
204     Parametre param;
205     Environnement env;
206
207     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
208     // @@@ --------> SECTION CRITIQUE <-------- @@@
209     pthread_mutex_lock(&_threads_mutex);
210     std::map<Id, Child >::iterator pos = _threads.find(id);
211     bool found = (pos != _threads.end());
212     if (found) {
213       param = pos->second.param;
214       env   = pos->second.env;
215     }
216     pthread_mutex_unlock(&_threads_mutex);
217     // @@@ --------> SECTION CRITIQUE <-------- @@@
218     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
219
220     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
221
222     JobInfo_Local ji(param, env);
223     return ji;
224   }
225
226
227   // Ce manager ne peut pas reprendre un job
228   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
229   // au JobId
230   const Batch::JobId
231   BatchManager_Local::addJob(const Batch::Job & job, const std::string & reference)
232   {
233     return JobId(this, "undefined");
234   }
235
236   // Methode pour le controle des jobs : teste si un job est present en machine
237   bool BatchManager_Local::isRunning(const JobId & jobid)
238   {
239     Id id;
240     istringstream iss(jobid.getReference());
241     iss >> id;
242
243     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
244     // @@@ --------> SECTION CRITIQUE <-------- @@@
245     pthread_mutex_lock(&_threads_mutex);
246     bool running = (_threads[id].param[STATE].str() == RUNNING);
247     pthread_mutex_unlock(&_threads_mutex);
248     // @@@ --------> SECTION CRITIQUE <-------- @@@
249     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
250
251     return running;
252   }
253
254   string BatchManager_Local::ThreadAdapter::buildCommandFile(const Job_Local & job)
255   {
256     Parametre param = job.getParametre();
257
258     // Mandatory parameters
259     string workDir = "";
260     if (param.find(WORKDIR) != param.end())
261       workDir = param[WORKDIR].str();
262     else
263       throw RunTimeException("param[WORKDIR] is not defined. Please define it, cannot submit this job.");
264     string fileToExecute = "";
265     if (param.find(EXECUTABLE) != param.end())
266       fileToExecute = param[EXECUTABLE].str();
267     else
268       throw RunTimeException("param[EXECUTABLE] is not defined. Please define it, cannot submit this job.");
269
270     string::size_type p1 = fileToExecute.find_last_of("/");
271     string::size_type p2 = fileToExecute.find_last_of(".");
272     string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1);
273     string fileNameToExecute = fileToExecute.substr(p1+1);
274     string remotePath = workDir + "/" + rootNameToExecute + "_launch_job";
275
276     // Create batch submit file
277     ofstream tempOutputFile;
278     string tmpFileName = Utils::createAndOpenTemporaryFile("launch-job", tempOutputFile);
279
280 #ifdef WIN32
281     if (_bm._protocol.getType() == SH) {
282       char drive[_MAX_DRIVE];
283       _splitpath(workDir.c_str(), drive, NULL, NULL, NULL);
284       if (strlen(drive) > 0) tempOutputFile << drive << endl;
285       tempOutputFile << "cd " << Utils::fixPath(workDir) << endl;
286       // Define environment for the job
287       Environnement env = job.getEnvironnement();
288       for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
289         tempOutputFile << "set " << iter->first << "=" << iter->second << endl;
290       }
291       // Launch the executable
292       tempOutputFile << fileNameToExecute;
293       if (param.find(ARGUMENTS) != param.end()) {
294         Versatile V = param[ARGUMENTS];
295         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
296           StringType argt = * static_cast<StringType *>(*it);
297           string     arg  = argt;
298           tempOutputFile << " " << arg;
299         }
300       }
301       remotePath += ".bat";
302     } else {
303 #endif
304
305     tempOutputFile << "#!/bin/sh" << endl;
306     tempOutputFile << "cd " << workDir << endl;
307
308     // Optional parameters (system limits on the job process)
309     if (param.find(MAXCPUTIME) != param.end()) {
310       long maxcputime = (long)param[MAXCPUTIME] * 60;
311       tempOutputFile << "ulimit -H -t " << maxcputime << endl;
312     }
313
314     if (param.find(MAXDISKSIZE) != param.end()) {
315       long maxdisksize = (long)param[MAXDISKSIZE] * 1024;
316       tempOutputFile << "ulimit -H -f " << maxdisksize << endl;
317     }
318
319     if (param.find(MAXRAMSIZE) != param.end()) {
320       long maxramsize = (long)param[MAXRAMSIZE] * 1024;
321       tempOutputFile << "ulimit -H -v " << maxramsize << endl;
322     }
323
324     // Define environment for the job
325     Environnement env = job.getEnvironnement();
326     for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
327       tempOutputFile << "export " << iter->first << "=" << iter->second << endl;
328     }
329
330     // generate nodes file
331     tempOutputFile << "LIBBATCH_NODEFILE=`mktemp nodefile-XXXXXXXXXX`" << endl;
332     tempOutputFile << "echo `hostname` > $LIBBATCH_NODEFILE" << endl;
333     tempOutputFile << "export LIBBATCH_NODEFILE" << endl;
334
335     // Launch the executable
336     tempOutputFile << "./" + fileNameToExecute;
337     if (param.find(ARGUMENTS) != param.end()) {
338       Versatile V = param[ARGUMENTS];
339       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
340         StringType argt = * static_cast<StringType *>(*it);
341         string     arg  = argt;
342         tempOutputFile << " " << arg;
343       }
344     }
345
346     // Standard input and output
347     if (param.find(INFILE) != param.end()) {
348       Versatile V = param[INFILE];
349       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
350         Couple cpl = * static_cast<CoupleType*>(*it);
351         string remote = cpl.getRemote();
352         if (remote == "stdin")
353           tempOutputFile << " <stdin";
354       }
355     }
356
357     string stdoutFile = workDir + "/logs/output.log." + rootNameToExecute;
358     string stderrFile = workDir + "/logs/error.log." + rootNameToExecute;
359     if (param.find(OUTFILE) != param.end()) {
360       Versatile V = param[OUTFILE];
361       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
362         Couple cpl = * static_cast<CoupleType*>(*it);
363         string remote = cpl.getRemote();
364         if (remote == "stdout") stdoutFile = "stdout";
365         if (remote == "stderr") stderrFile = "stderr";
366       }
367     }
368     tempOutputFile << " 1>" << stdoutFile << " 2>" << stderrFile << endl;
369
370     // Remove the node file
371     tempOutputFile << "rm $LIBBATCH_NODEFILE" << endl;
372
373 #ifdef WIN32
374     }
375 #endif
376
377     tempOutputFile.flush();
378     tempOutputFile.close();
379
380     cerr << "Batch script file generated is: " << tmpFileName << endl;
381
382     Utils::chmod(tmpFileName.c_str(), 0x1ED);
383     int status = _bm._protocol.copyFile(tmpFileName, "", "",
384                                         remotePath, _bm._hostname, _bm._username);
385     if (status)
386       throw RunTimeException("Cannot copy command file on host " + _bm._hostname);
387
388 #ifdef WIN32
389     if (_bm._protocol.getType() != SH) {
390       // On Windows, we make the remote file executable afterward because
391       // pscp does not preserve access permissions on files
392       string subCommand = string("chmod u+x ") + remotePath;
393       string command = _bm._protocol.getExecCommand(subCommand, _bm._hostname, _bm._username);
394       cerr << command.c_str() << endl;
395       status = system(command.c_str());
396       if (status) {
397         std::ostringstream oss;
398         oss << "Cannot change permissions of file " << remotePath << " on host " << _bm._hostname;
399         oss << ". Return status is " << status;
400         throw RunTimeException(oss.str());
401       }
402     }
403 #endif
404
405     return remotePath;
406   }
407
408
409
410   // Constructeur de la classe ThreadAdapter
411   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
412   _bm(bm), _job(job), _id(id)
413   {
414     // Nothing to do
415   }
416
417
418
419   // Methode d'execution du thread
420   void * BatchManager_Local::ThreadAdapter::run(void * arg)
421   {
422     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
423
424 #ifndef WIN32
425     // On bloque tous les signaux pour ce thread
426     sigset_t setmask;
427     sigfillset(&setmask);
428     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
429 #endif
430
431     // On autorise la terminaison differee du thread
432     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
433     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
434     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
435
436     // On enregistre la fonction de suppression du fils en cas d'arret du thread
437     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
438     // sera prise en compte par pthread_testcancel()
439     Process child;
440     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
441     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
442     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
443
444     // On forke/exec un nouveau process pour pouvoir controler le fils
445     // (plus finement qu'avec un appel system)
446     // int rc = system(commande.c_str());
447     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
448     //execv("/usr/bin/ssh", parmList);
449 #ifdef WIN32
450     child = p_ta->launchWin32ChildProcess();
451     p_ta->pere(child);
452 #else
453     child = fork();
454     if (child < 0) { // erreur
455       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
456
457     } else if (child > 0) { // pere
458       p_ta->pere(child);
459
460     } else { // fils
461       p_ta->fils();
462     }
463 #endif
464
465     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
466
467     // Set the job state to FINISHED or FAILED
468     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
469
470     // On retire la fonction de nettoyage de la memoire
471     pthread_cleanup_pop(0);
472
473     // On retire la fonction de suppression du fils
474     pthread_cleanup_pop(0);
475
476     // remove setFailedOnCancel function from cancel stack
477     pthread_cleanup_pop(0);
478
479     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
480
481     // On invoque la fonction de nettoyage de la memoire
482     delete_on_exit(arg);
483
484     UNDER_LOCK( cout << "Father is leaving" << endl );
485     pthread_exit(NULL);
486     return NULL;
487   }
488
489
490
491
492   void BatchManager_Local::ThreadAdapter::pere(Process child)
493   {
494     time_t child_starttime = time(NULL);
495
496     // On enregistre le fils dans la table des threads
497     pthread_t thread_id = pthread_self();
498
499     Parametre param   = _job.getParametre();
500     Environnement env = _job.getEnvironnement();
501
502     ostringstream id_sst;
503     id_sst << _id;
504     param[ID]         = id_sst.str();
505     param[STATE]      = Batch::RUNNING;
506
507     _bm._threads[_id].thread_id = thread_id;
508 #ifndef WIN32
509     _bm._threads[_id].pid       = child;
510 #endif
511     _bm._threads[_id].hasFailed = false;
512     _bm._threads[_id].param     = param;
513     _bm._threads[_id].env       = env;
514     _bm._threads[_id].command_queue.push(NOP);
515
516     // Unlock the master thread. From here, all shared variables must be protected
517     // from concurrent access
518     pthread_cond_signal(&_bm._threadSyncCondition);
519
520
521     // on boucle en attendant que le fils ait termine
522     while (1) {
523 #ifdef WIN32
524       DWORD exitCode;
525       GetExitCodeProcess(child, &exitCode);
526       if (exitCode != STILL_ACTIVE) {
527         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
528         break;
529       }
530 #else
531       int child_rc = 0;
532       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
533       if (child_wait_rc > 0) {
534          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
535          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
536          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
537          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
538          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
539          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
540          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
541 #ifdef WIFCONTINUED
542          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
543 #endif
544         if (WIFSTOPPED(child_rc)) {
545           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
546           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
547           // desactive car s'il est possible de detecter l'arret d'un process, il est
548           // plus difficile de detecter sa reprise.
549
550           // Le fils est simplement stoppe
551           // @@@ --------> SECTION CRITIQUE <-------- @@@
552           pthread_mutex_lock(&_bm._threads_mutex);
553           _bm._threads[_id].param[STATE] = Batch::PAUSED;
554           pthread_mutex_unlock(&_bm._threads_mutex);
555           // @@@ --------> SECTION CRITIQUE <-------- @@@
556           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
557
558         }
559         else {
560           // Le fils est termine, on sort de la boucle et du if englobant
561           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
562           break;
563         }
564       }
565       else if (child_wait_rc == -1) {
566         // Le fils a disparu ...
567         // @@@ --------> SECTION CRITIQUE <-------- @@@
568         pthread_mutex_lock(&_bm._threads_mutex);
569         _bm._threads[_id].hasFailed = true;
570         pthread_mutex_unlock(&_bm._threads_mutex);
571         // @@@ --------> SECTION CRITIQUE <-------- @@@
572         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
573         break;
574       }
575 #endif
576
577       // On teste si le thread doit etre detruit
578       pthread_testcancel();
579
580
581
582       // On regarde si le fils n'a pas depasse son temps (wallclock time)
583       time_t child_currenttime = time(NULL);
584       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
585       if (param.find(MAXWALLTIME) != param.end()) {
586         long maxwalltime = param[MAXWALLTIME];
587         //        cout << "child_starttime          = " << child_starttime        << endl
588         //             << "child_currenttime        = " << child_currenttime      << endl
589         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
590         //             << "maxwalltime              = " << maxwalltime            << endl
591         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
592         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
593           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
594           // On introduit une commande dans la queue du thread
595           // @@@ --------> SECTION CRITIQUE <-------- @@@
596           pthread_mutex_lock(&_bm._threads_mutex);
597           if (_bm._threads.find(_id) != _bm._threads.end())
598             _bm._threads[_id].command_queue.push(KILL);
599           pthread_mutex_unlock(&_bm._threads_mutex);
600           // @@@ --------> SECTION CRITIQUE <-------- @@@
601
602
603         } else if (child_elapsedtime_minutes > maxwalltime ) {
604           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
605           // On introduit une commande dans la queue du thread
606           // @@@ --------> SECTION CRITIQUE <-------- @@@
607           pthread_mutex_lock(&_bm._threads_mutex);
608           if (_bm._threads.find(_id) != _bm._threads.end())
609             _bm._threads[_id].command_queue.push(TERM);
610           pthread_mutex_unlock(&_bm._threads_mutex);
611           // @@@ --------> SECTION CRITIQUE <-------- @@@
612         }
613       }
614
615
616
617       // On regarde s'il y a quelque chose a faire dans la queue de commande
618       // @@@ --------> SECTION CRITIQUE <-------- @@@
619       pthread_mutex_lock(&_bm._threads_mutex);
620       if (_bm._threads.find(_id) != _bm._threads.end()) {
621         while (_bm._threads[_id].command_queue.size() > 0) {
622           Commande cmd = _bm._threads[_id].command_queue.front();
623           _bm._threads[_id].command_queue.pop();
624
625           switch (cmd) {
626     case NOP:
627       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
628       break;
629 #ifndef WIN32
630     case HOLD:
631       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
632       kill(child, SIGSTOP);
633       break;
634
635     case RELEASE:
636       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
637       kill(child, SIGCONT);
638       break;
639
640     case TERM:
641       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
642       kill(child, SIGTERM);
643       break;
644
645     case KILL:
646       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
647       kill(child, SIGKILL);
648       break;
649 #endif
650     case ALTER:
651       break;
652
653     default:
654       break;
655           }
656         }
657
658       }
659       pthread_mutex_unlock(&_bm._threads_mutex);
660       // @@@ --------> SECTION CRITIQUE <-------- @@@
661
662       // On fait une petite pause pour ne pas surcharger inutilement le processeur
663 #ifdef WIN32
664       Sleep(1000);
665 #else
666       sleep(1);
667 #endif
668
669     }
670
671   }
672
673
674
675 #ifndef WIN32
676
677   void BatchManager_Local::ThreadAdapter::fils()
678   {
679     Parametre param = _job.getParametre();
680     Parametre::iterator it;
681
682     try {
683
684       // build script file to launch the job and copy it on the server
685       string cmdFilePath = buildCommandFile(_job);
686
687       // define command to submit the job
688       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
689
690       // Build the argument array argv from the command
691       char ** argv = new char * [command.size() + 1];
692       string comstr;
693       for (string::size_type i=0 ; i<command.size() ; i++) {
694         argv[i] = new char[command[i].size() + 1];
695         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
696         comstr += command[i] + " ";
697       }
698       argv[command.size()] = NULL;
699       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
700
701       // On cree une session pour le fils de facon a ce qu'il ne soit pas
702       // detruit lorsque le shell se termine (le shell ouvre une session et
703       // tue tous les process appartenant a la session en quittant)
704       setsid();
705
706       // On ferme les descripteurs de fichiers standards
707       //close(STDIN_FILENO);
708       //close(STDOUT_FILENO);
709       //close(STDERR_FILENO);
710
711       // On execute la commande du fils
712       execv(argv[0], argv);
713       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
714       // No need to deallocate since nothing happens after a successful exec
715
716       // Normalement on ne devrait jamais arriver ici
717       ofstream file_err("error.log");
718       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
719
720     } catch (GenericException & e) {
721
722       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
723     }
724
725     exit(99);
726   }
727
728 #else
729
730   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
731   {
732     Parametre param = _job.getParametre();
733     Parametre::iterator it;
734     PROCESS_INFORMATION pi;
735
736     try {
737
738       // build script file to launch the job and copy it on the server
739       string cmdFilePath = buildCommandFile(_job);
740
741       // define command to submit the job
742       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
743
744       // Build the command string from the command argument vector
745       string comstr;
746       for (unsigned int i=0 ; i<command.size() ; i++) {
747         if (i>0) comstr += " ";
748         comstr += command[i];
749       }
750
751       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
752
753       STARTUPINFO si;
754       ZeroMemory( &si, sizeof(si) );
755       si.cb = sizeof(si);
756       ZeroMemory( &pi, sizeof(pi) );
757
758       // Copy the command to a non-const buffer
759       char * buffer = strdup(comstr.c_str());
760
761       // launch the new process
762       bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
763                                CREATE_NO_WINDOW, NULL, NULL, &si, &pi);
764
765       if (buffer) free(buffer);
766       if (!res) throw RunTimeException("Error while creating new process");
767
768       CloseHandle(pi.hThread);
769
770     } catch (GenericException & e) {
771
772       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
773     }
774
775     return pi.hProcess;
776   }
777
778 #endif
779
780
781   void BatchManager_Local::kill_child_on_exit(void * p_pid)
782   {
783 #ifndef WIN32
784     //TODO: porting of following functionality
785     pid_t child = * static_cast<pid_t *>(p_pid);
786
787     // On tue le fils
788     kill(child, SIGTERM);
789
790     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
791     // mais cette option n'est pas implementee pour le moment, car il est
792     // preferable de laisser le process fils se terminer normalement et seul.
793 #endif
794   }
795
796   void BatchManager_Local::delete_on_exit(void * arg)
797   {
798     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
799     delete p_ta;
800   }
801
802   void BatchManager_Local::setFailedOnCancel(void * arg)
803   {
804     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
805     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
806     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
807     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
808
809     // Unlock the master thread. From here, the batch manager instance should not be used.
810     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
811   }
812
813 }