Salome HOME
40ba3fe9fd408136fc658339d9e7c64f14efd322
[tools/libbatch.git] / src / Local / BatchManager_Local.cxx
1 // Copyright (C) 2007-2020  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include "Constants.hxx"
55 #include "IOMutex.hxx"
56 #include "BatchManager_Local.hxx"
57 #include "RunTimeException.hxx"
58 #include "Utils.hxx"
59 #include "Log.hxx"
60
61 using namespace std;
62
63 namespace Batch {
64
65
66   // Constructeur
67   BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
68                                          const char * username,
69                                          CommunicationProtocolType protocolType, const char * mpiImpl)
70     : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
71       _idCounter(0)
72   {
73     pthread_mutex_init(&_threads_mutex, NULL);
74     pthread_cond_init(&_threadSyncCondition, NULL);
75   }
76
77   // Destructeur
78   BatchManager_Local::~BatchManager_Local()
79   {
80     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
81       pthread_mutex_lock(&_threads_mutex);
82       string state = iter->second.param[STATE];
83       if (state != FINISHED && state != FAILED) {
84         UNDER_LOCK( LOG("Warning: Job " << iter->first <<
85                         " is not finished, it will now be canceled."));
86         pthread_cancel(iter->second.thread_id);
87         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
88       }
89       pthread_mutex_unlock(&_threads_mutex);
90     }
91     pthread_mutex_destroy(&_threads_mutex);
92     pthread_cond_destroy(&_threadSyncCondition);
93   }
94
95   // Methode pour le controle des jobs : soumet un job au gestionnaire
96   const JobId BatchManager_Local::runJob(const Job & job)
97   {
98     Job_Local jobLocal = job;
99     Id id = _idCounter++;
100     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
101
102     // Les attributs du thread a sa creation
103     pthread_attr_t thread_attr;
104     pthread_attr_init(&thread_attr);
105     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
106
107     // Creation du thread qui va executer la commande systeme qu'on lui passe
108     pthread_t thread_id;
109     pthread_mutex_lock(&_threads_mutex);
110     int rc = pthread_create(&thread_id,
111       &thread_attr,
112       &ThreadAdapter::run,
113       static_cast<void *>(p_ta));
114
115     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
116     pthread_attr_destroy(&thread_attr);
117
118     if (rc != 0) {
119       pthread_mutex_unlock(&_threads_mutex);
120       throw RunTimeException("Can't create new thread in BatchManager_Local");
121     }
122
123     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
124     pthread_mutex_unlock(&_threads_mutex);
125
126     ostringstream id_sst;
127     id_sst << id;
128     return JobId(this, id_sst.str());
129   }
130
131   // Methode pour le controle des jobs : retire un job du gestionnaire
132   void BatchManager_Local::deleteJob(const JobId & jobid)
133   {
134     Id id;
135
136     istringstream iss(jobid.getReference());
137     iss >> id;
138
139     // @@@ --------> SECTION CRITIQUE <-------- @@@
140     pthread_mutex_lock(&_threads_mutex);
141     bool idFound = (_threads.find(id) != _threads.end());
142     if (idFound) {
143       string state = _threads[id].param[STATE];
144       if (state != FINISHED && state != FAILED) {
145         pthread_cancel(_threads[id].thread_id);
146         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
147       } else {
148         LOG("Cannot delete job " << jobid.getReference() << ". Job is already finished.");
149       }
150     }
151     pthread_mutex_unlock(&_threads_mutex);
152     // @@@ --------> SECTION CRITIQUE <-------- @@@
153
154     if (!idFound)
155       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
156   }
157
158   // Methode pour le controle des jobs : suspend un job en file d'attente
159   void BatchManager_Local::holdJob(const JobId & jobid)
160   {
161     Id id;
162     istringstream iss(jobid.getReference());
163     iss >> id;
164
165     UNDER_LOCK( LOG("BatchManager is sending HOLD command to the thread " << id) );
166
167     // On introduit une commande dans la queue du thread
168     // @@@ --------> SECTION CRITIQUE <-------- @@@
169     pthread_mutex_lock(&_threads_mutex);
170     if (_threads.find(id) != _threads.end())
171       _threads[id].command_queue.push(HOLD);
172     pthread_mutex_unlock(&_threads_mutex);
173     // @@@ --------> SECTION CRITIQUE <-------- @@@
174   }
175
176   // Methode pour le controle des jobs : relache un job suspendu
177   void BatchManager_Local::releaseJob(const JobId & jobid)
178   {
179     Id id;
180     istringstream iss(jobid.getReference());
181     iss >> id;
182
183     UNDER_LOCK( LOG("BatchManager is sending RELEASE command to the thread " << id) );
184
185     // On introduit une commande dans la queue du thread
186     // @@@ --------> SECTION CRITIQUE <-------- @@@
187     pthread_mutex_lock(&_threads_mutex);
188     if (_threads.find(id) != _threads.end())
189       _threads[id].command_queue.push(RELEASE);
190     pthread_mutex_unlock(&_threads_mutex);
191     // @@@ --------> SECTION CRITIQUE <-------- @@@
192   }
193
194   // Methode pour le controle des jobs : renvoie l'etat du job
195   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
196   {
197     Id id;
198     istringstream iss(jobid.getReference());
199     iss >> id;
200
201     Parametre param;
202     Environnement env;
203
204     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
205     // @@@ --------> SECTION CRITIQUE <-------- @@@
206     pthread_mutex_lock(&_threads_mutex);
207     std::map<Id, Child >::iterator pos = _threads.find(id);
208     bool found = (pos != _threads.end());
209     if (found) {
210       param = pos->second.param;
211       env   = pos->second.env;
212     }
213     pthread_mutex_unlock(&_threads_mutex);
214     // @@@ --------> SECTION CRITIQUE <-------- @@@
215     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
216
217     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
218
219     JobInfo_Local ji(param, env);
220     return ji;
221   }
222
223
224   // Ce manager ne peut pas reprendre un job
225   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
226   // au JobId
227   const Batch::JobId
228   BatchManager_Local::addJob(const Batch::Job & /*job*/, const std::string & /*reference*/)
229   {
230     return JobId(this, "undefined");
231   }
232
233   // Methode pour le controle des jobs : teste si un job est present en machine
234   bool BatchManager_Local::isRunning(const JobId & jobid)
235   {
236     Id id;
237     istringstream iss(jobid.getReference());
238     iss >> id;
239
240     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
241     // @@@ --------> SECTION CRITIQUE <-------- @@@
242     pthread_mutex_lock(&_threads_mutex);
243     bool running = (_threads[id].param[STATE].str() == RUNNING);
244     pthread_mutex_unlock(&_threads_mutex);
245     // @@@ --------> SECTION CRITIQUE <-------- @@@
246     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
247
248     return running;
249   }
250
251   string BatchManager_Local::ThreadAdapter::buildCommandFile(const Job_Local & job)
252   {
253     Parametre param = job.getParametre();
254
255     // Mandatory parameters
256     string workDir = "";
257     if (param.find(WORKDIR) != param.end())
258       workDir = param[WORKDIR].str();
259     else
260       throw RunTimeException("param[WORKDIR] is not defined. Please define it, cannot submit this job.");
261     string fileToExecute = "";
262     if (param.find(EXECUTABLE) != param.end())
263       fileToExecute = param[EXECUTABLE].str();
264     else
265       throw RunTimeException("param[EXECUTABLE] is not defined. Please define it, cannot submit this job.");
266 #ifdef WIN32
267     const char separator = '\\';
268 #else
269     const char separator = '/';
270 #endif
271     string::size_type p1 = fileToExecute.find_last_of(separator);
272     string::size_type p2 = fileToExecute.find_last_of(".");
273     string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1);
274     string fileNameToExecute = fileToExecute.substr(p1+1);
275     string remotePath = workDir + separator + rootNameToExecute + "_launch_job";
276
277     // Create batch submit file
278     ofstream tempOutputFile;
279     string tmpFileName = Utils::createAndOpenTemporaryFile("launch-job", tempOutputFile);
280
281 #ifdef WIN32
282     if (_bm._protocol.getType() == SH) {
283       char drive[_MAX_DRIVE];
284       _splitpath(workDir.c_str(), drive, NULL, NULL, NULL);
285       if (strlen(drive) > 0) tempOutputFile << drive << endl;
286       tempOutputFile << "cd " << Utils::fixPath(workDir) << endl;
287       // Define environment for the job
288       Environnement env = job.getEnvironnement();
289       for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
290         tempOutputFile << "set " << iter->first << "=" << iter->second << endl;
291       }
292       // Launch the executable
293       tempOutputFile << fileNameToExecute;
294       if (param.find(ARGUMENTS) != param.end()) {
295         Versatile V = param[ARGUMENTS];
296         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
297           StringType argt = * static_cast<StringType *>(*it);
298           string     arg  = argt;
299           tempOutputFile << " " << arg;
300         }
301       }
302       remotePath += ".bat";
303     } else {
304 #endif
305
306     tempOutputFile << "#!/bin/sh -f" << endl;
307     tempOutputFile << "cd " << workDir << endl;
308
309     // Optional parameters (system limits on the job process)
310     if (param.find(MAXCPUTIME) != param.end()) {
311       long maxcputime = (long)param[MAXCPUTIME] * 60;
312       tempOutputFile << "ulimit -H -t " << maxcputime << endl;
313     }
314
315     if (param.find(MAXDISKSIZE) != param.end()) {
316       long maxdisksize = (long)param[MAXDISKSIZE] * 1024;
317       tempOutputFile << "ulimit -H -f " << maxdisksize << endl;
318     }
319
320     if (param.find(MAXRAMSIZE) != param.end()) {
321       long maxramsize = (long)param[MAXRAMSIZE] * 1024;
322       tempOutputFile << "ulimit -H -v " << maxramsize << endl;
323     }
324
325     // Number of cores to use
326     int nbproc = 1;
327     if (param.find(NBPROC) != param.end())
328       nbproc = param[NBPROC];
329
330     // Define environment for the job
331     Environnement env = job.getEnvironnement();
332     for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
333       tempOutputFile << "export " << iter->first << "=" << iter->second << endl;
334     }
335
336     // generate nodes file (one line per required proc)
337     tempOutputFile << "LIBBATCH_NODEFILE=$(mktemp nodefile-XXXXXXXXXX)" << endl;
338     tempOutputFile << "i=" << nbproc << endl;
339     tempOutputFile << "hn=$(hostname)" << endl;
340     tempOutputFile << "{" << endl;
341     tempOutputFile << "while [ $i -gt 0 ]" << endl;
342     tempOutputFile << "do" << endl;
343     tempOutputFile << "    echo \"$hn\"" << endl;
344     tempOutputFile << "    i=$((i-1))" << endl;
345     tempOutputFile << "done" << endl;
346     tempOutputFile << "} > \"$LIBBATCH_NODEFILE\"" << endl;
347     tempOutputFile << "export LIBBATCH_NODEFILE" << endl;
348
349     // Launch the executable
350     tempOutputFile << "./" + fileNameToExecute;
351     if (param.find(ARGUMENTS) != param.end()) {
352       Versatile V = param[ARGUMENTS];
353       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
354         StringType argt = * static_cast<StringType *>(*it);
355         string     arg  = argt;
356         tempOutputFile << " " << arg;
357       }
358     }
359
360     // Standard input and output
361     if (param.find(INFILE) != param.end()) {
362       Versatile V = param[INFILE];
363       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
364         Couple cpl = * static_cast<CoupleType*>(*it);
365         string remote = cpl.getRemote();
366         if (remote == "stdin")
367           tempOutputFile << " <stdin";
368       }
369     }
370
371     string stdoutFile = workDir + "/logs/output.log." + rootNameToExecute;
372     string stderrFile = workDir + "/logs/error.log." + rootNameToExecute;
373     if (param.find(OUTFILE) != param.end()) {
374       Versatile V = param[OUTFILE];
375       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
376         Couple cpl = * static_cast<CoupleType*>(*it);
377         string remote = cpl.getRemote();
378         if (remote == "stdout") stdoutFile = "stdout";
379         if (remote == "stderr") stderrFile = "stderr";
380       }
381     }
382     tempOutputFile << " 1>" << stdoutFile << " 2>" << stderrFile << endl;
383
384     // Remove the node file
385     tempOutputFile << "rm \"$LIBBATCH_NODEFILE\"" << endl;
386
387 #ifdef WIN32
388     }
389 #endif
390
391     tempOutputFile.flush();
392     tempOutputFile.close();
393
394     LOG("Batch script file generated is: " << tmpFileName);
395
396     Utils::chmod(tmpFileName.c_str(), 0x1ED);
397     int status = _bm._protocol.copyFile(tmpFileName, "", "",
398                                         remotePath, _bm._hostname, _bm._username);
399     if (status)
400       throw RunTimeException("Cannot copy command file on host " + _bm._hostname);
401
402 #ifdef WIN32
403     if (_bm._protocol.getType() != SH) {
404       // On Windows, we make the remote file executable afterward because
405       // pscp does not preserve access permissions on files
406       string subCommand = string("chmod u+x ") + remotePath;
407       string command = _bm._protocol.getExecCommand(subCommand, _bm._hostname, _bm._username);
408       LOG(command);
409       status = system(command.c_str());
410       if (status) {
411         std::ostringstream oss;
412         oss << "Cannot change permissions of file " << remotePath << " on host " << _bm._hostname;
413         oss << ". Return status is " << status;
414         throw RunTimeException(oss.str());
415       }
416     }
417 #endif
418
419     return remotePath;
420   }
421
422
423
424   // Constructeur de la classe ThreadAdapter
425   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
426   _bm(bm), _job(job), _id(id)
427   {
428     // Nothing to do
429   }
430
431
432
433   // Methode d'execution du thread
434   void * BatchManager_Local::ThreadAdapter::run(void * arg)
435   {
436     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
437
438 #ifndef WIN32
439     // On bloque tous les signaux pour ce thread
440     sigset_t setmask;
441     sigfillset(&setmask);
442     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
443 #endif
444
445     // On autorise la terminaison differee du thread
446     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
447     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
448     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
449
450     // On enregistre la fonction de suppression du fils en cas d'arret du thread
451     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
452     // sera prise en compte par pthread_testcancel()
453     Process child;
454     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
455     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
456     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
457
458     // On forke/exec un nouveau process pour pouvoir controler le fils
459     // (plus finement qu'avec un appel system)
460     // int rc = system(commande.c_str());
461     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
462     //execv("/usr/bin/ssh", parmList);
463 #ifdef WIN32
464     child = p_ta->launchWin32ChildProcess();
465     p_ta->pere(child);
466 #else
467     // LOCK&UNLOCK needed to avoid potential deadlock if a thread holds the lock
468     LOCK_IO
469     child = fork();
470     UNLOCK_IO
471     if (child < 0) { // erreur
472       UNDER_LOCK( LOG("Fork impossible (rc=" << child << ")") );
473
474     } else if (child > 0) { // pere
475       p_ta->pere(child);
476
477     } else { // fils
478       p_ta->fils();
479     }
480 #endif
481
482     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
483
484     // Set the job state to FINISHED or FAILED
485     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
486
487     // On retire la fonction de nettoyage de la memoire
488     pthread_cleanup_pop(0);
489
490     // On retire la fonction de suppression du fils
491     pthread_cleanup_pop(0);
492
493     // remove setFailedOnCancel function from cancel stack
494     pthread_cleanup_pop(0);
495
496     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
497
498     // On invoque la fonction de nettoyage de la memoire
499     delete_on_exit(arg);
500
501     UNDER_LOCK( LOG("Father is leaving") );
502     pthread_exit(NULL);
503     return NULL;
504   }
505
506
507
508
509   void BatchManager_Local::ThreadAdapter::pere(Process child)
510   {
511     time_t child_starttime = time(NULL);
512
513     // On enregistre le fils dans la table des threads
514     pthread_t thread_id = pthread_self();
515
516     Parametre param   = _job.getParametre();
517     Environnement env = _job.getEnvironnement();
518
519     ostringstream id_sst;
520     id_sst << _id;
521     param[ID]         = id_sst.str();
522     param[STATE]      = Batch::RUNNING;
523
524     _bm._threads[_id].thread_id = thread_id;
525 #ifndef WIN32
526     _bm._threads[_id].pid       = child;
527 #endif
528     _bm._threads[_id].hasFailed = false;
529     _bm._threads[_id].param     = param;
530     _bm._threads[_id].env       = env;
531     _bm._threads[_id].command_queue.push(NOP);
532
533     // Unlock the master thread. From here, all shared variables must be protected
534     // from concurrent access
535     pthread_cond_signal(&_bm._threadSyncCondition);
536
537
538     // on boucle en attendant que le fils ait termine
539     while (1) {
540 #ifdef WIN32
541       DWORD exitCode;
542       GetExitCodeProcess(child, &exitCode);
543       if (exitCode != STILL_ACTIVE) {
544         UNDER_LOCK( LOG("Father sees his child is DONE: exit code = " << exitCode) );
545         break;
546       }
547 #else
548       int child_rc = 0;
549       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
550       if (child_wait_rc > 0) {
551          UNDER_LOCK( LOG("Status is: " << WIFEXITED( child_rc)) );
552          UNDER_LOCK( LOG("Status is: " << WEXITSTATUS( child_rc)) );
553          UNDER_LOCK( LOG("Status is: " << WIFSIGNALED( child_rc)) );
554          UNDER_LOCK( LOG("Status is: " << WTERMSIG( child_rc)) );
555          UNDER_LOCK( LOG("Status is: " << WCOREDUMP( child_rc)) );
556          UNDER_LOCK( LOG("Status is: " << WIFSTOPPED( child_rc)) );
557          UNDER_LOCK( LOG("Status is: " << WSTOPSIG( child_rc)) );
558 #ifdef WIFCONTINUED
559          UNDER_LOCK( LOG("Status is: " << WIFCONTINUED( child_rc)) ); // not compilable on sarge
560 #endif
561         if (WIFSTOPPED(child_rc)) {
562           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
563           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
564           // desactive car s'il est possible de detecter l'arret d'un process, il est
565           // plus difficile de detecter sa reprise.
566
567           // Le fils est simplement stoppe
568           // @@@ --------> SECTION CRITIQUE <-------- @@@
569           pthread_mutex_lock(&_bm._threads_mutex);
570           _bm._threads[_id].param[STATE] = Batch::PAUSED;
571           pthread_mutex_unlock(&_bm._threads_mutex);
572           // @@@ --------> SECTION CRITIQUE <-------- @@@
573           UNDER_LOCK( LOG("Father sees his child is STOPPED : " << child_wait_rc) );
574
575         }
576         else {
577           // Le fils est termine, on sort de la boucle et du if englobant
578           UNDER_LOCK( LOG("Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")") );
579           break;
580         }
581       }
582       else if (child_wait_rc == -1) {
583         // Le fils a disparu ...
584         // @@@ --------> SECTION CRITIQUE <-------- @@@
585         pthread_mutex_lock(&_bm._threads_mutex);
586         _bm._threads[_id].hasFailed = true;
587         pthread_mutex_unlock(&_bm._threads_mutex);
588         // @@@ --------> SECTION CRITIQUE <-------- @@@
589         UNDER_LOCK( LOG("Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")") );
590         break;
591       }
592 #endif
593
594       // On teste si le thread doit etre detruit
595       pthread_testcancel();
596
597
598
599       // On regarde si le fils n'a pas depasse son temps (wallclock time)
600       time_t child_currenttime = time(NULL);
601       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
602       if (param.find(MAXWALLTIME) != param.end()) {
603         long maxwalltime = param[MAXWALLTIME];
604         //        cout << "child_starttime          = " << child_starttime        << endl
605         //             << "child_currenttime        = " << child_currenttime      << endl
606         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
607         //             << "maxwalltime              = " << maxwalltime            << endl
608         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
609         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
610           UNDER_LOCK( LOG("Father is sending KILL command to the thread " << _id) );
611           // On introduit une commande dans la queue du thread
612           // @@@ --------> SECTION CRITIQUE <-------- @@@
613           pthread_mutex_lock(&_bm._threads_mutex);
614           if (_bm._threads.find(_id) != _bm._threads.end())
615             _bm._threads[_id].command_queue.push(KILL);
616           pthread_mutex_unlock(&_bm._threads_mutex);
617           // @@@ --------> SECTION CRITIQUE <-------- @@@
618
619
620         } else if (child_elapsedtime_minutes > maxwalltime ) {
621           UNDER_LOCK( LOG("Father is sending TERM command to the thread " << _id) );
622           // On introduit une commande dans la queue du thread
623           // @@@ --------> SECTION CRITIQUE <-------- @@@
624           pthread_mutex_lock(&_bm._threads_mutex);
625           if (_bm._threads.find(_id) != _bm._threads.end())
626             _bm._threads[_id].command_queue.push(TERM);
627           pthread_mutex_unlock(&_bm._threads_mutex);
628           // @@@ --------> SECTION CRITIQUE <-------- @@@
629         }
630       }
631
632
633
634       // On regarde s'il y a quelque chose a faire dans la queue de commande
635       // @@@ --------> SECTION CRITIQUE <-------- @@@
636       pthread_mutex_lock(&_bm._threads_mutex);
637       if (_bm._threads.find(_id) != _bm._threads.end()) {
638         while (_bm._threads[_id].command_queue.size() > 0) {
639           Commande cmd = _bm._threads[_id].command_queue.front();
640           _bm._threads[_id].command_queue.pop();
641
642           switch (cmd) {
643     case NOP:
644       UNDER_LOCK( LOG("Father does nothing to his child") );
645       break;
646 #ifndef WIN32
647     case HOLD:
648       UNDER_LOCK( LOG("Father is sending SIGSTOP signal to his child") );
649       kill(child, SIGSTOP);
650       break;
651
652     case RELEASE:
653       UNDER_LOCK( LOG("Father is sending SIGCONT signal to his child") );
654       kill(child, SIGCONT);
655       break;
656
657     case TERM:
658       UNDER_LOCK( LOG("Father is sending SIGTERM signal to his child") );
659       kill(child, SIGTERM);
660       break;
661
662     case KILL:
663       UNDER_LOCK( LOG("Father is sending SIGKILL signal to his child") );
664       kill(child, SIGKILL);
665       break;
666 #endif
667     case ALTER:
668       break;
669
670     default:
671       break;
672           }
673         }
674
675       }
676       pthread_mutex_unlock(&_bm._threads_mutex);
677       // @@@ --------> SECTION CRITIQUE <-------- @@@
678
679       // On fait une petite pause pour ne pas surcharger inutilement le processeur
680 #ifdef WIN32
681       Sleep(1000);
682 #else
683       sleep(1);
684 #endif
685
686     }
687
688   }
689
690
691
692 #ifndef WIN32
693
694   void BatchManager_Local::ThreadAdapter::fils()
695   {
696     Parametre param = _job.getParametre();
697     Parametre::iterator it;
698
699     try {
700
701       // build script file to launch the job and copy it on the server
702       string cmdFilePath = buildCommandFile(_job);
703
704       // define command to submit the job
705       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
706
707       // Build the argument array argv from the command
708       char ** argv = new char * [command.size() + 1];
709       string comstr;
710       for (string::size_type i=0 ; i<command.size() ; i++) {
711         argv[i] = new char[command[i].size() + 1];
712         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
713         comstr += command[i] + " ";
714       }
715       argv[command.size()] = NULL;
716       UNDER_LOCK( LOG("*** debug_command = " << comstr) );
717
718       // On cree une session pour le fils de facon a ce qu'il ne soit pas
719       // detruit lorsque le shell se termine (le shell ouvre une session et
720       // tue tous les process appartenant a la session en quittant)
721       setsid();
722
723       // On ferme les descripteurs de fichiers standards
724       //close(STDIN_FILENO);
725       //close(STDOUT_FILENO);
726       //close(STDERR_FILENO);
727
728       // On execute la commande du fils
729       execv(argv[0], argv);
730       UNDER_LOCK( LOG("*** debug_command = " << strerror(errno)) );
731       // No need to deallocate since nothing happens after a successful exec
732
733       // Normalement on ne devrait jamais arriver ici
734       ofstream file_err("error.log");
735       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
736
737     } catch (GenericException & e) {
738
739       LOG("Caught exception : " << e.type << " : " << e.message);
740     }
741
742     exit(99);
743   }
744
745 #else
746
747   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
748   {
749     Parametre param = _job.getParametre();
750     Parametre::iterator it;
751     PROCESS_INFORMATION pi;
752
753     try {
754
755       // build script file to launch the job and copy it on the server
756       string cmdFilePath = buildCommandFile(_job);
757
758       // define command to submit the job
759       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
760
761       // Build the command string from the command argument vector
762       string comstr;
763       for (unsigned int i=0 ; i<command.size() ; i++) {
764         if (i>0) comstr += " ";
765         comstr += command[i];
766       }
767
768       UNDER_LOCK( LOG("*** debug_command = " << comstr) );
769
770       STARTUPINFO si;
771       ZeroMemory( &si, sizeof(si) );
772       si.cb = sizeof(si);
773       ZeroMemory( &pi, sizeof(pi) );
774
775       // Copy the command to a non-const buffer
776       char * buffer = strdup(comstr.c_str());
777
778       // launch the new process
779       bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
780                                CREATE_NO_WINDOW, NULL, NULL, &si, &pi);
781
782       if (buffer) free(buffer);
783       if (!res) throw RunTimeException("Error while creating new process");
784
785       CloseHandle(pi.hThread);
786
787     } catch (GenericException & e) {
788
789       LOG("Caught exception : " << e.type << " : " << e.message);
790     }
791
792     return pi.hProcess;
793   }
794
795 #endif
796
797
798   void BatchManager_Local::kill_child_on_exit(void * p_pid)
799   {
800 #ifndef WIN32
801     //TODO: porting of following functionality
802     pid_t child = * static_cast<pid_t *>(p_pid);
803
804     // On tue le fils
805     kill(child, SIGTERM);
806
807     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
808     // mais cette option n'est pas implementee pour le moment, car il est
809     // preferable de laisser le process fils se terminer normalement et seul.
810 #endif
811   }
812
813   void BatchManager_Local::delete_on_exit(void * arg)
814   {
815     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
816     delete p_ta;
817   }
818
819   void BatchManager_Local::setFailedOnCancel(void * arg)
820   {
821     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
822     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
823     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
824     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
825
826     // Unlock the master thread. From here, the batch manager instance should not be used.
827     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
828   }
829
830 }