Salome HOME
Merge branch 'rbe/evol-job-newparams'
[tools/libbatch.git] / src / Local / BatchManager_Local.cxx
1 //  Copyright (C) 2007-2013  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include "Constants.hxx"
55 #include "IOMutex.hxx"
56 #include "BatchManager_Local.hxx"
57 #include "RunTimeException.hxx"
58 #include "Utils.hxx"
59 #include "Log.hxx"
60
61 using namespace std;
62
63 namespace Batch {
64
65
66   // Constructeur
67   BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
68                                          const char * username,
69                                          CommunicationProtocolType protocolType, const char * mpiImpl)
70     : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
71       _idCounter(0)
72   {
73     pthread_mutex_init(&_threads_mutex, NULL);
74     pthread_cond_init(&_threadSyncCondition, NULL);
75   }
76
77   // Destructeur
78   BatchManager_Local::~BatchManager_Local()
79   {
80     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
81       pthread_mutex_lock(&_threads_mutex);
82       string state = iter->second.param[STATE];
83       if (state != FINISHED && state != FAILED) {
84         UNDER_LOCK( LOG("Warning: Job " << iter->first <<
85                         " is not finished, it will now be canceled."));
86         pthread_cancel(iter->second.thread_id);
87         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
88       }
89       pthread_mutex_unlock(&_threads_mutex);
90     }
91     pthread_mutex_destroy(&_threads_mutex);
92     pthread_cond_destroy(&_threadSyncCondition);
93   }
94
95   // Methode pour le controle des jobs : soumet un job au gestionnaire
96   const JobId BatchManager_Local::submitJob(const Job & job)
97   {
98     // export input files in the working directory of the execution host
99     exportInputFiles(job);
100
101     Job_Local jobLocal = job;
102     Id id = _idCounter++;
103     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
104
105     // Les attributs du thread a sa creation
106     pthread_attr_t thread_attr;
107     pthread_attr_init(&thread_attr);
108     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
109
110     // Creation du thread qui va executer la commande systeme qu'on lui passe
111     pthread_t thread_id;
112     pthread_mutex_lock(&_threads_mutex);
113     int rc = pthread_create(&thread_id,
114       &thread_attr,
115       &ThreadAdapter::run,
116       static_cast<void *>(p_ta));
117
118     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
119     pthread_attr_destroy(&thread_attr);
120
121     if (rc != 0) {
122       pthread_mutex_unlock(&_threads_mutex);
123       throw RunTimeException("Can't create new thread in BatchManager_Local");
124     }
125
126     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
127     pthread_mutex_unlock(&_threads_mutex);
128
129     ostringstream id_sst;
130     id_sst << id;
131     return JobId(this, id_sst.str());
132   }
133
134   // Methode pour le controle des jobs : retire un job du gestionnaire
135   void BatchManager_Local::deleteJob(const JobId & jobid)
136   {
137     Id id;
138
139     istringstream iss(jobid.getReference());
140     iss >> id;
141
142     // @@@ --------> SECTION CRITIQUE <-------- @@@
143     pthread_mutex_lock(&_threads_mutex);
144     bool idFound = (_threads.find(id) != _threads.end());
145     if (idFound) {
146       string state = _threads[id].param[STATE];
147       if (state != FINISHED && state != FAILED) {
148         pthread_cancel(_threads[id].thread_id);
149         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
150       } else {
151         LOG("Cannot delete job " << jobid.getReference() << ". Job is already finished.");
152       }
153     }
154     pthread_mutex_unlock(&_threads_mutex);
155     // @@@ --------> SECTION CRITIQUE <-------- @@@
156
157     if (!idFound)
158       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
159   }
160
161   // Methode pour le controle des jobs : suspend un job en file d'attente
162   void BatchManager_Local::holdJob(const JobId & jobid)
163   {
164     Id id;
165     istringstream iss(jobid.getReference());
166     iss >> id;
167
168     UNDER_LOCK( LOG("BatchManager is sending HOLD command to the thread " << id) );
169
170     // On introduit une commande dans la queue du thread
171     // @@@ --------> SECTION CRITIQUE <-------- @@@
172     pthread_mutex_lock(&_threads_mutex);
173     if (_threads.find(id) != _threads.end())
174       _threads[id].command_queue.push(HOLD);
175     pthread_mutex_unlock(&_threads_mutex);
176     // @@@ --------> SECTION CRITIQUE <-------- @@@
177   }
178
179   // Methode pour le controle des jobs : relache un job suspendu
180   void BatchManager_Local::releaseJob(const JobId & jobid)
181   {
182     Id id;
183     istringstream iss(jobid.getReference());
184     iss >> id;
185
186     UNDER_LOCK( LOG("BatchManager is sending RELEASE command to the thread " << id) );
187
188     // On introduit une commande dans la queue du thread
189     // @@@ --------> SECTION CRITIQUE <-------- @@@
190     pthread_mutex_lock(&_threads_mutex);
191     if (_threads.find(id) != _threads.end())
192       _threads[id].command_queue.push(RELEASE);
193     pthread_mutex_unlock(&_threads_mutex);
194     // @@@ --------> SECTION CRITIQUE <-------- @@@
195   }
196
197   // Methode pour le controle des jobs : renvoie l'etat du job
198   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
199   {
200     Id id;
201     istringstream iss(jobid.getReference());
202     iss >> id;
203
204     Parametre param;
205     Environnement env;
206
207     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
208     // @@@ --------> SECTION CRITIQUE <-------- @@@
209     pthread_mutex_lock(&_threads_mutex);
210     std::map<Id, Child >::iterator pos = _threads.find(id);
211     bool found = (pos != _threads.end());
212     if (found) {
213       param = pos->second.param;
214       env   = pos->second.env;
215     }
216     pthread_mutex_unlock(&_threads_mutex);
217     // @@@ --------> SECTION CRITIQUE <-------- @@@
218     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
219
220     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
221
222     JobInfo_Local ji(param, env);
223     return ji;
224   }
225
226
227   // Ce manager ne peut pas reprendre un job
228   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
229   // au JobId
230   const Batch::JobId
231   BatchManager_Local::addJob(const Batch::Job & job, const std::string & reference)
232   {
233     return JobId(this, "undefined");
234   }
235
236   // Methode pour le controle des jobs : teste si un job est present en machine
237   bool BatchManager_Local::isRunning(const JobId & jobid)
238   {
239     Id id;
240     istringstream iss(jobid.getReference());
241     iss >> id;
242
243     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
244     // @@@ --------> SECTION CRITIQUE <-------- @@@
245     pthread_mutex_lock(&_threads_mutex);
246     bool running = (_threads[id].param[STATE].str() == RUNNING);
247     pthread_mutex_unlock(&_threads_mutex);
248     // @@@ --------> SECTION CRITIQUE <-------- @@@
249     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
250
251     return running;
252   }
253
254   string BatchManager_Local::ThreadAdapter::buildCommandFile(const Job_Local & job)
255   {
256     Parametre param = job.getParametre();
257
258     // Mandatory parameters
259     string workDir = "";
260     if (param.find(WORKDIR) != param.end())
261       workDir = param[WORKDIR].str();
262     else
263       throw RunTimeException("param[WORKDIR] is not defined. Please define it, cannot submit this job.");
264     string fileToExecute = "";
265     if (param.find(EXECUTABLE) != param.end())
266       fileToExecute = param[EXECUTABLE].str();
267     else
268       throw RunTimeException("param[EXECUTABLE] is not defined. Please define it, cannot submit this job.");
269
270     string::size_type p1 = fileToExecute.find_last_of("/");
271     string::size_type p2 = fileToExecute.find_last_of(".");
272     string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1);
273     string fileNameToExecute = fileToExecute.substr(p1+1);
274     string remotePath = workDir + "/" + rootNameToExecute + "_launch_job";
275
276     // Create batch submit file
277     ofstream tempOutputFile;
278     string tmpFileName = Utils::createAndOpenTemporaryFile("launch-job", tempOutputFile);
279
280 #ifdef WIN32
281     if (_bm._protocol.getType() == SH) {
282       char drive[_MAX_DRIVE];
283       _splitpath(workDir.c_str(), drive, NULL, NULL, NULL);
284       if (strlen(drive) > 0) tempOutputFile << drive << endl;
285       tempOutputFile << "cd " << Utils::fixPath(workDir) << endl;
286       // Define environment for the job
287       Environnement env = job.getEnvironnement();
288       for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
289         tempOutputFile << "set " << iter->first << "=" << iter->second << endl;
290       }
291       // Launch the executable
292       tempOutputFile << fileNameToExecute;
293       if (param.find(ARGUMENTS) != param.end()) {
294         Versatile V = param[ARGUMENTS];
295         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
296           StringType argt = * static_cast<StringType *>(*it);
297           string     arg  = argt;
298           tempOutputFile << " " << arg;
299         }
300       }
301       remotePath += ".bat";
302     } else {
303 #endif
304
305     tempOutputFile << "#!/bin/sh" << endl;
306     tempOutputFile << "cd " << workDir << endl;
307
308     // Optional parameters (system limits on the job process)
309     if (param.find(MAXCPUTIME) != param.end()) {
310       long maxcputime = (long)param[MAXCPUTIME] * 60;
311       tempOutputFile << "ulimit -H -t " << maxcputime << endl;
312     }
313
314     if (param.find(MAXDISKSIZE) != param.end()) {
315       long maxdisksize = (long)param[MAXDISKSIZE] * 1024;
316       tempOutputFile << "ulimit -H -f " << maxdisksize << endl;
317     }
318
319     if (param.find(MAXRAMSIZE) != param.end()) {
320       long maxramsize = (long)param[MAXRAMSIZE] * 1024;
321       tempOutputFile << "ulimit -H -v " << maxramsize << endl;
322     }
323
324     // Number of cores to use
325     int nbproc = 1;
326     if (param.find(NBPROC) != param.end())
327       nbproc = param[NBPROC];
328
329     // Define environment for the job
330     Environnement env = job.getEnvironnement();
331     for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
332       tempOutputFile << "export " << iter->first << "=" << iter->second << endl;
333     }
334
335     // generate nodes file
336     tempOutputFile << "LIBBATCH_NODEFILE=`mktemp nodefile-XXXXXXXXXX`" << endl;
337     for (int i=0 ; i<nbproc ; i++)
338       tempOutputFile << "echo `hostname` >> $LIBBATCH_NODEFILE" << endl;
339     tempOutputFile << "export LIBBATCH_NODEFILE" << endl;
340
341     // Launch the executable
342     tempOutputFile << "./" + fileNameToExecute;
343     if (param.find(ARGUMENTS) != param.end()) {
344       Versatile V = param[ARGUMENTS];
345       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
346         StringType argt = * static_cast<StringType *>(*it);
347         string     arg  = argt;
348         tempOutputFile << " " << arg;
349       }
350     }
351
352     // Standard input and output
353     if (param.find(INFILE) != param.end()) {
354       Versatile V = param[INFILE];
355       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
356         Couple cpl = * static_cast<CoupleType*>(*it);
357         string remote = cpl.getRemote();
358         if (remote == "stdin")
359           tempOutputFile << " <stdin";
360       }
361     }
362
363     string stdoutFile = workDir + "/logs/output.log." + rootNameToExecute;
364     string stderrFile = workDir + "/logs/error.log." + rootNameToExecute;
365     if (param.find(OUTFILE) != param.end()) {
366       Versatile V = param[OUTFILE];
367       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
368         Couple cpl = * static_cast<CoupleType*>(*it);
369         string remote = cpl.getRemote();
370         if (remote == "stdout") stdoutFile = "stdout";
371         if (remote == "stderr") stderrFile = "stderr";
372       }
373     }
374     tempOutputFile << " 1>" << stdoutFile << " 2>" << stderrFile << endl;
375
376     // Remove the node file
377     tempOutputFile << "rm $LIBBATCH_NODEFILE" << endl;
378
379 #ifdef WIN32
380     }
381 #endif
382
383     tempOutputFile.flush();
384     tempOutputFile.close();
385
386     LOG("Batch script file generated is: " << tmpFileName);
387
388     Utils::chmod(tmpFileName.c_str(), 0x1ED);
389     int status = _bm._protocol.copyFile(tmpFileName, "", "",
390                                         remotePath, _bm._hostname, _bm._username);
391     if (status)
392       throw RunTimeException("Cannot copy command file on host " + _bm._hostname);
393
394 #ifdef WIN32
395     if (_bm._protocol.getType() != SH) {
396       // On Windows, we make the remote file executable afterward because
397       // pscp does not preserve access permissions on files
398       string subCommand = string("chmod u+x ") + remotePath;
399       string command = _bm._protocol.getExecCommand(subCommand, _bm._hostname, _bm._username);
400       LOG(command);
401       status = system(command.c_str());
402       if (status) {
403         std::ostringstream oss;
404         oss << "Cannot change permissions of file " << remotePath << " on host " << _bm._hostname;
405         oss << ". Return status is " << status;
406         throw RunTimeException(oss.str());
407       }
408     }
409 #endif
410
411     return remotePath;
412   }
413
414
415
416   // Constructeur de la classe ThreadAdapter
417   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
418   _bm(bm), _job(job), _id(id)
419   {
420     // Nothing to do
421   }
422
423
424
425   // Methode d'execution du thread
426   void * BatchManager_Local::ThreadAdapter::run(void * arg)
427   {
428     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
429
430 #ifndef WIN32
431     // On bloque tous les signaux pour ce thread
432     sigset_t setmask;
433     sigfillset(&setmask);
434     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
435 #endif
436
437     // On autorise la terminaison differee du thread
438     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
439     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
440     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
441
442     // On enregistre la fonction de suppression du fils en cas d'arret du thread
443     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
444     // sera prise en compte par pthread_testcancel()
445     Process child;
446     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
447     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
448     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
449
450     // On forke/exec un nouveau process pour pouvoir controler le fils
451     // (plus finement qu'avec un appel system)
452     // int rc = system(commande.c_str());
453     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
454     //execv("/usr/bin/ssh", parmList);
455 #ifdef WIN32
456     child = p_ta->launchWin32ChildProcess();
457     p_ta->pere(child);
458 #else
459     child = fork();
460     if (child < 0) { // erreur
461       UNDER_LOCK( LOG("Fork impossible (rc=" << child << ")") );
462
463     } else if (child > 0) { // pere
464       p_ta->pere(child);
465
466     } else { // fils
467       p_ta->fils();
468     }
469 #endif
470
471     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
472
473     // Set the job state to FINISHED or FAILED
474     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
475
476     // On retire la fonction de nettoyage de la memoire
477     pthread_cleanup_pop(0);
478
479     // On retire la fonction de suppression du fils
480     pthread_cleanup_pop(0);
481
482     // remove setFailedOnCancel function from cancel stack
483     pthread_cleanup_pop(0);
484
485     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
486
487     // On invoque la fonction de nettoyage de la memoire
488     delete_on_exit(arg);
489
490     UNDER_LOCK( LOG("Father is leaving") );
491     pthread_exit(NULL);
492     return NULL;
493   }
494
495
496
497
498   void BatchManager_Local::ThreadAdapter::pere(Process child)
499   {
500     time_t child_starttime = time(NULL);
501
502     // On enregistre le fils dans la table des threads
503     pthread_t thread_id = pthread_self();
504
505     Parametre param   = _job.getParametre();
506     Environnement env = _job.getEnvironnement();
507
508     ostringstream id_sst;
509     id_sst << _id;
510     param[ID]         = id_sst.str();
511     param[STATE]      = Batch::RUNNING;
512
513     _bm._threads[_id].thread_id = thread_id;
514 #ifndef WIN32
515     _bm._threads[_id].pid       = child;
516 #endif
517     _bm._threads[_id].hasFailed = false;
518     _bm._threads[_id].param     = param;
519     _bm._threads[_id].env       = env;
520     _bm._threads[_id].command_queue.push(NOP);
521
522     // Unlock the master thread. From here, all shared variables must be protected
523     // from concurrent access
524     pthread_cond_signal(&_bm._threadSyncCondition);
525
526
527     // on boucle en attendant que le fils ait termine
528     while (1) {
529 #ifdef WIN32
530       DWORD exitCode;
531       GetExitCodeProcess(child, &exitCode);
532       if (exitCode != STILL_ACTIVE) {
533         UNDER_LOCK( LOG("Father sees his child is DONE: exit code = " << exitCode) );
534         break;
535       }
536 #else
537       int child_rc = 0;
538       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
539       if (child_wait_rc > 0) {
540          UNDER_LOCK( LOG("Status is: " << WIFEXITED( child_rc)) );
541          UNDER_LOCK( LOG("Status is: " << WEXITSTATUS( child_rc)) );
542          UNDER_LOCK( LOG("Status is: " << WIFSIGNALED( child_rc)) );
543          UNDER_LOCK( LOG("Status is: " << WTERMSIG( child_rc)) );
544          UNDER_LOCK( LOG("Status is: " << WCOREDUMP( child_rc)) );
545          UNDER_LOCK( LOG("Status is: " << WIFSTOPPED( child_rc)) );
546          UNDER_LOCK( LOG("Status is: " << WSTOPSIG( child_rc)) );
547 #ifdef WIFCONTINUED
548          UNDER_LOCK( LOG("Status is: " << WIFCONTINUED( child_rc)) ); // not compilable on sarge
549 #endif
550         if (WIFSTOPPED(child_rc)) {
551           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
552           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
553           // desactive car s'il est possible de detecter l'arret d'un process, il est
554           // plus difficile de detecter sa reprise.
555
556           // Le fils est simplement stoppe
557           // @@@ --------> SECTION CRITIQUE <-------- @@@
558           pthread_mutex_lock(&_bm._threads_mutex);
559           _bm._threads[_id].param[STATE] = Batch::PAUSED;
560           pthread_mutex_unlock(&_bm._threads_mutex);
561           // @@@ --------> SECTION CRITIQUE <-------- @@@
562           UNDER_LOCK( LOG("Father sees his child is STOPPED : " << child_wait_rc) );
563
564         }
565         else {
566           // Le fils est termine, on sort de la boucle et du if englobant
567           UNDER_LOCK( LOG("Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")") );
568           break;
569         }
570       }
571       else if (child_wait_rc == -1) {
572         // Le fils a disparu ...
573         // @@@ --------> SECTION CRITIQUE <-------- @@@
574         pthread_mutex_lock(&_bm._threads_mutex);
575         _bm._threads[_id].hasFailed = true;
576         pthread_mutex_unlock(&_bm._threads_mutex);
577         // @@@ --------> SECTION CRITIQUE <-------- @@@
578         UNDER_LOCK( LOG("Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")") );
579         break;
580       }
581 #endif
582
583       // On teste si le thread doit etre detruit
584       pthread_testcancel();
585
586
587
588       // On regarde si le fils n'a pas depasse son temps (wallclock time)
589       time_t child_currenttime = time(NULL);
590       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
591       if (param.find(MAXWALLTIME) != param.end()) {
592         long maxwalltime = param[MAXWALLTIME];
593         //        cout << "child_starttime          = " << child_starttime        << endl
594         //             << "child_currenttime        = " << child_currenttime      << endl
595         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
596         //             << "maxwalltime              = " << maxwalltime            << endl
597         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
598         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
599           UNDER_LOCK( LOG("Father is sending KILL command to the thread " << _id) );
600           // On introduit une commande dans la queue du thread
601           // @@@ --------> SECTION CRITIQUE <-------- @@@
602           pthread_mutex_lock(&_bm._threads_mutex);
603           if (_bm._threads.find(_id) != _bm._threads.end())
604             _bm._threads[_id].command_queue.push(KILL);
605           pthread_mutex_unlock(&_bm._threads_mutex);
606           // @@@ --------> SECTION CRITIQUE <-------- @@@
607
608
609         } else if (child_elapsedtime_minutes > maxwalltime ) {
610           UNDER_LOCK( LOG("Father is sending TERM command to the thread " << _id) );
611           // On introduit une commande dans la queue du thread
612           // @@@ --------> SECTION CRITIQUE <-------- @@@
613           pthread_mutex_lock(&_bm._threads_mutex);
614           if (_bm._threads.find(_id) != _bm._threads.end())
615             _bm._threads[_id].command_queue.push(TERM);
616           pthread_mutex_unlock(&_bm._threads_mutex);
617           // @@@ --------> SECTION CRITIQUE <-------- @@@
618         }
619       }
620
621
622
623       // On regarde s'il y a quelque chose a faire dans la queue de commande
624       // @@@ --------> SECTION CRITIQUE <-------- @@@
625       pthread_mutex_lock(&_bm._threads_mutex);
626       if (_bm._threads.find(_id) != _bm._threads.end()) {
627         while (_bm._threads[_id].command_queue.size() > 0) {
628           Commande cmd = _bm._threads[_id].command_queue.front();
629           _bm._threads[_id].command_queue.pop();
630
631           switch (cmd) {
632     case NOP:
633       UNDER_LOCK( LOG("Father does nothing to his child") );
634       break;
635 #ifndef WIN32
636     case HOLD:
637       UNDER_LOCK( LOG("Father is sending SIGSTOP signal to his child") );
638       kill(child, SIGSTOP);
639       break;
640
641     case RELEASE:
642       UNDER_LOCK( LOG("Father is sending SIGCONT signal to his child") );
643       kill(child, SIGCONT);
644       break;
645
646     case TERM:
647       UNDER_LOCK( LOG("Father is sending SIGTERM signal to his child") );
648       kill(child, SIGTERM);
649       break;
650
651     case KILL:
652       UNDER_LOCK( LOG("Father is sending SIGKILL signal to his child") );
653       kill(child, SIGKILL);
654       break;
655 #endif
656     case ALTER:
657       break;
658
659     default:
660       break;
661           }
662         }
663
664       }
665       pthread_mutex_unlock(&_bm._threads_mutex);
666       // @@@ --------> SECTION CRITIQUE <-------- @@@
667
668       // On fait une petite pause pour ne pas surcharger inutilement le processeur
669 #ifdef WIN32
670       Sleep(1000);
671 #else
672       sleep(1);
673 #endif
674
675     }
676
677   }
678
679
680
681 #ifndef WIN32
682
683   void BatchManager_Local::ThreadAdapter::fils()
684   {
685     Parametre param = _job.getParametre();
686     Parametre::iterator it;
687
688     try {
689
690       // build script file to launch the job and copy it on the server
691       string cmdFilePath = buildCommandFile(_job);
692
693       // define command to submit the job
694       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
695
696       // Build the argument array argv from the command
697       char ** argv = new char * [command.size() + 1];
698       string comstr;
699       for (string::size_type i=0 ; i<command.size() ; i++) {
700         argv[i] = new char[command[i].size() + 1];
701         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
702         comstr += command[i] + " ";
703       }
704       argv[command.size()] = NULL;
705       UNDER_LOCK( LOG("*** debug_command = " << comstr) );
706
707       // On cree une session pour le fils de facon a ce qu'il ne soit pas
708       // detruit lorsque le shell se termine (le shell ouvre une session et
709       // tue tous les process appartenant a la session en quittant)
710       setsid();
711
712       // On ferme les descripteurs de fichiers standards
713       //close(STDIN_FILENO);
714       //close(STDOUT_FILENO);
715       //close(STDERR_FILENO);
716
717       // On execute la commande du fils
718       execv(argv[0], argv);
719       UNDER_LOCK( LOG("*** debug_command = " << strerror(errno)) );
720       // No need to deallocate since nothing happens after a successful exec
721
722       // Normalement on ne devrait jamais arriver ici
723       ofstream file_err("error.log");
724       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
725
726     } catch (GenericException & e) {
727
728       LOG("Caught exception : " << e.type << " : " << e.message);
729     }
730
731     exit(99);
732   }
733
734 #else
735
736   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
737   {
738     Parametre param = _job.getParametre();
739     Parametre::iterator it;
740     PROCESS_INFORMATION pi;
741
742     try {
743
744       // build script file to launch the job and copy it on the server
745       string cmdFilePath = buildCommandFile(_job);
746
747       // define command to submit the job
748       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
749
750       // Build the command string from the command argument vector
751       string comstr;
752       for (unsigned int i=0 ; i<command.size() ; i++) {
753         if (i>0) comstr += " ";
754         comstr += command[i];
755       }
756
757       UNDER_LOCK( LOG("*** debug_command = " << comstr) );
758
759       STARTUPINFO si;
760       ZeroMemory( &si, sizeof(si) );
761       si.cb = sizeof(si);
762       ZeroMemory( &pi, sizeof(pi) );
763
764       // Copy the command to a non-const buffer
765       char * buffer = strdup(comstr.c_str());
766
767       // launch the new process
768       bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
769                                CREATE_NO_WINDOW, NULL, NULL, &si, &pi);
770
771       if (buffer) free(buffer);
772       if (!res) throw RunTimeException("Error while creating new process");
773
774       CloseHandle(pi.hThread);
775
776     } catch (GenericException & e) {
777
778       LOG("Caught exception : " << e.type << " : " << e.message);
779     }
780
781     return pi.hProcess;
782   }
783
784 #endif
785
786
787   void BatchManager_Local::kill_child_on_exit(void * p_pid)
788   {
789 #ifndef WIN32
790     //TODO: porting of following functionality
791     pid_t child = * static_cast<pid_t *>(p_pid);
792
793     // On tue le fils
794     kill(child, SIGTERM);
795
796     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
797     // mais cette option n'est pas implementee pour le moment, car il est
798     // preferable de laisser le process fils se terminer normalement et seul.
799 #endif
800   }
801
802   void BatchManager_Local::delete_on_exit(void * arg)
803   {
804     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
805     delete p_ta;
806   }
807
808   void BatchManager_Local::setFailedOnCancel(void * arg)
809   {
810     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
811     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
812     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
813     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
814
815     // Unlock the master thread. From here, the batch manager instance should not be used.
816     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
817   }
818
819 }