Salome HOME
Fix some bashisms and potential errors in generated shell scripts
[tools/libbatch.git] / src / Local / BatchManager_Local.cxx
1 //  Copyright (C) 2007-2014  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include "Constants.hxx"
55 #include "IOMutex.hxx"
56 #include "BatchManager_Local.hxx"
57 #include "RunTimeException.hxx"
58 #include "Utils.hxx"
59 #include "Log.hxx"
60
61 using namespace std;
62
63 namespace Batch {
64
65
66   // Constructeur
67   BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
68                                          const char * username,
69                                          CommunicationProtocolType protocolType, const char * mpiImpl)
70     : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
71       _idCounter(0)
72   {
73     pthread_mutex_init(&_threads_mutex, NULL);
74     pthread_cond_init(&_threadSyncCondition, NULL);
75   }
76
77   // Destructeur
78   BatchManager_Local::~BatchManager_Local()
79   {
80     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
81       pthread_mutex_lock(&_threads_mutex);
82       string state = iter->second.param[STATE];
83       if (state != FINISHED && state != FAILED) {
84         UNDER_LOCK( LOG("Warning: Job " << iter->first <<
85                         " is not finished, it will now be canceled."));
86         pthread_cancel(iter->second.thread_id);
87         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
88       }
89       pthread_mutex_unlock(&_threads_mutex);
90     }
91     pthread_mutex_destroy(&_threads_mutex);
92     pthread_cond_destroy(&_threadSyncCondition);
93   }
94
95   // Methode pour le controle des jobs : soumet un job au gestionnaire
96   const JobId BatchManager_Local::submitJob(const Job & job)
97   {
98     // export input files in the working directory of the execution host
99     exportInputFiles(job);
100
101     Job_Local jobLocal = job;
102     Id id = _idCounter++;
103     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
104
105     // Les attributs du thread a sa creation
106     pthread_attr_t thread_attr;
107     pthread_attr_init(&thread_attr);
108     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
109
110     // Creation du thread qui va executer la commande systeme qu'on lui passe
111     pthread_t thread_id;
112     pthread_mutex_lock(&_threads_mutex);
113     int rc = pthread_create(&thread_id,
114       &thread_attr,
115       &ThreadAdapter::run,
116       static_cast<void *>(p_ta));
117
118     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
119     pthread_attr_destroy(&thread_attr);
120
121     if (rc != 0) {
122       pthread_mutex_unlock(&_threads_mutex);
123       throw RunTimeException("Can't create new thread in BatchManager_Local");
124     }
125
126     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
127     pthread_mutex_unlock(&_threads_mutex);
128
129     ostringstream id_sst;
130     id_sst << id;
131     return JobId(this, id_sst.str());
132   }
133
134   // Methode pour le controle des jobs : retire un job du gestionnaire
135   void BatchManager_Local::deleteJob(const JobId & jobid)
136   {
137     Id id;
138
139     istringstream iss(jobid.getReference());
140     iss >> id;
141
142     // @@@ --------> SECTION CRITIQUE <-------- @@@
143     pthread_mutex_lock(&_threads_mutex);
144     bool idFound = (_threads.find(id) != _threads.end());
145     if (idFound) {
146       string state = _threads[id].param[STATE];
147       if (state != FINISHED && state != FAILED) {
148         pthread_cancel(_threads[id].thread_id);
149         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
150       } else {
151         LOG("Cannot delete job " << jobid.getReference() << ". Job is already finished.");
152       }
153     }
154     pthread_mutex_unlock(&_threads_mutex);
155     // @@@ --------> SECTION CRITIQUE <-------- @@@
156
157     if (!idFound)
158       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
159   }
160
161   // Methode pour le controle des jobs : suspend un job en file d'attente
162   void BatchManager_Local::holdJob(const JobId & jobid)
163   {
164     Id id;
165     istringstream iss(jobid.getReference());
166     iss >> id;
167
168     UNDER_LOCK( LOG("BatchManager is sending HOLD command to the thread " << id) );
169
170     // On introduit une commande dans la queue du thread
171     // @@@ --------> SECTION CRITIQUE <-------- @@@
172     pthread_mutex_lock(&_threads_mutex);
173     if (_threads.find(id) != _threads.end())
174       _threads[id].command_queue.push(HOLD);
175     pthread_mutex_unlock(&_threads_mutex);
176     // @@@ --------> SECTION CRITIQUE <-------- @@@
177   }
178
179   // Methode pour le controle des jobs : relache un job suspendu
180   void BatchManager_Local::releaseJob(const JobId & jobid)
181   {
182     Id id;
183     istringstream iss(jobid.getReference());
184     iss >> id;
185
186     UNDER_LOCK( LOG("BatchManager is sending RELEASE command to the thread " << id) );
187
188     // On introduit une commande dans la queue du thread
189     // @@@ --------> SECTION CRITIQUE <-------- @@@
190     pthread_mutex_lock(&_threads_mutex);
191     if (_threads.find(id) != _threads.end())
192       _threads[id].command_queue.push(RELEASE);
193     pthread_mutex_unlock(&_threads_mutex);
194     // @@@ --------> SECTION CRITIQUE <-------- @@@
195   }
196
197   // Methode pour le controle des jobs : renvoie l'etat du job
198   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
199   {
200     Id id;
201     istringstream iss(jobid.getReference());
202     iss >> id;
203
204     Parametre param;
205     Environnement env;
206
207     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
208     // @@@ --------> SECTION CRITIQUE <-------- @@@
209     pthread_mutex_lock(&_threads_mutex);
210     std::map<Id, Child >::iterator pos = _threads.find(id);
211     bool found = (pos != _threads.end());
212     if (found) {
213       param = pos->second.param;
214       env   = pos->second.env;
215     }
216     pthread_mutex_unlock(&_threads_mutex);
217     // @@@ --------> SECTION CRITIQUE <-------- @@@
218     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
219
220     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
221
222     JobInfo_Local ji(param, env);
223     return ji;
224   }
225
226
227   // Ce manager ne peut pas reprendre un job
228   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
229   // au JobId
230   const Batch::JobId
231   BatchManager_Local::addJob(const Batch::Job & job, const std::string & reference)
232   {
233     return JobId(this, "undefined");
234   }
235
236   // Methode pour le controle des jobs : teste si un job est present en machine
237   bool BatchManager_Local::isRunning(const JobId & jobid)
238   {
239     Id id;
240     istringstream iss(jobid.getReference());
241     iss >> id;
242
243     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
244     // @@@ --------> SECTION CRITIQUE <-------- @@@
245     pthread_mutex_lock(&_threads_mutex);
246     bool running = (_threads[id].param[STATE].str() == RUNNING);
247     pthread_mutex_unlock(&_threads_mutex);
248     // @@@ --------> SECTION CRITIQUE <-------- @@@
249     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
250
251     return running;
252   }
253
254   string BatchManager_Local::ThreadAdapter::buildCommandFile(const Job_Local & job)
255   {
256     Parametre param = job.getParametre();
257
258     // Mandatory parameters
259     string workDir = "";
260     if (param.find(WORKDIR) != param.end())
261       workDir = param[WORKDIR].str();
262     else
263       throw RunTimeException("param[WORKDIR] is not defined. Please define it, cannot submit this job.");
264     string fileToExecute = "";
265     if (param.find(EXECUTABLE) != param.end())
266       fileToExecute = param[EXECUTABLE].str();
267     else
268       throw RunTimeException("param[EXECUTABLE] is not defined. Please define it, cannot submit this job.");
269
270     string::size_type p1 = fileToExecute.find_last_of("/");
271     string::size_type p2 = fileToExecute.find_last_of(".");
272     string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1);
273     string fileNameToExecute = fileToExecute.substr(p1+1);
274     string remotePath = workDir + "/" + rootNameToExecute + "_launch_job";
275
276     // Create batch submit file
277     ofstream tempOutputFile;
278     string tmpFileName = Utils::createAndOpenTemporaryFile("launch-job", tempOutputFile);
279
280 #ifdef WIN32
281     if (_bm._protocol.getType() == SH) {
282       char drive[_MAX_DRIVE];
283       _splitpath(workDir.c_str(), drive, NULL, NULL, NULL);
284       if (strlen(drive) > 0) tempOutputFile << drive << endl;
285       tempOutputFile << "cd " << Utils::fixPath(workDir) << endl;
286       // Define environment for the job
287       Environnement env = job.getEnvironnement();
288       for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
289         tempOutputFile << "set " << iter->first << "=" << iter->second << endl;
290       }
291       // Launch the executable
292       tempOutputFile << fileNameToExecute;
293       if (param.find(ARGUMENTS) != param.end()) {
294         Versatile V = param[ARGUMENTS];
295         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
296           StringType argt = * static_cast<StringType *>(*it);
297           string     arg  = argt;
298           tempOutputFile << " " << arg;
299         }
300       }
301       remotePath += ".bat";
302     } else {
303 #endif
304
305     tempOutputFile << "#!/bin/sh -f" << endl;
306     tempOutputFile << "cd " << workDir << endl;
307
308     // Optional parameters (system limits on the job process)
309     if (param.find(MAXCPUTIME) != param.end()) {
310       long maxcputime = (long)param[MAXCPUTIME] * 60;
311       tempOutputFile << "ulimit -H -t " << maxcputime << endl;
312     }
313
314     if (param.find(MAXDISKSIZE) != param.end()) {
315       long maxdisksize = (long)param[MAXDISKSIZE] * 1024;
316       tempOutputFile << "ulimit -H -f " << maxdisksize << endl;
317     }
318
319     if (param.find(MAXRAMSIZE) != param.end()) {
320       long maxramsize = (long)param[MAXRAMSIZE] * 1024;
321       tempOutputFile << "ulimit -H -v " << maxramsize << endl;
322     }
323
324     // Number of cores to use
325     int nbproc = 1;
326     if (param.find(NBPROC) != param.end())
327       nbproc = param[NBPROC];
328
329     // Define environment for the job
330     Environnement env = job.getEnvironnement();
331     for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
332       tempOutputFile << "export " << iter->first << "=" << iter->second << endl;
333     }
334
335     // generate nodes file (one line per required proc)
336     tempOutputFile << "LIBBATCH_NODEFILE=$(mktemp nodefile-XXXXXXXXXX)" << endl;
337     tempOutputFile << "i=" << nbproc << endl;
338     tempOutputFile << "hn=$(hostname)" << endl;
339     tempOutputFile << "{" << endl;
340     tempOutputFile << "while [ $i -gt 0 ]" << endl;
341     tempOutputFile << "do" << endl;
342     tempOutputFile << "    echo \"$hn\"" << endl;
343     tempOutputFile << "    i=$((i-1))" << endl;
344     tempOutputFile << "done" << endl;
345     tempOutputFile << "} > \"$LIBBATCH_NODEFILE\"" << endl;
346     tempOutputFile << "export LIBBATCH_NODEFILE" << endl;
347
348     // Launch the executable
349     tempOutputFile << "./" + fileNameToExecute;
350     if (param.find(ARGUMENTS) != param.end()) {
351       Versatile V = param[ARGUMENTS];
352       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
353         StringType argt = * static_cast<StringType *>(*it);
354         string     arg  = argt;
355         tempOutputFile << " " << arg;
356       }
357     }
358
359     // Standard input and output
360     if (param.find(INFILE) != param.end()) {
361       Versatile V = param[INFILE];
362       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
363         Couple cpl = * static_cast<CoupleType*>(*it);
364         string remote = cpl.getRemote();
365         if (remote == "stdin")
366           tempOutputFile << " <stdin";
367       }
368     }
369
370     string stdoutFile = workDir + "/logs/output.log." + rootNameToExecute;
371     string stderrFile = workDir + "/logs/error.log." + rootNameToExecute;
372     if (param.find(OUTFILE) != param.end()) {
373       Versatile V = param[OUTFILE];
374       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
375         Couple cpl = * static_cast<CoupleType*>(*it);
376         string remote = cpl.getRemote();
377         if (remote == "stdout") stdoutFile = "stdout";
378         if (remote == "stderr") stderrFile = "stderr";
379       }
380     }
381     tempOutputFile << " 1>" << stdoutFile << " 2>" << stderrFile << endl;
382
383     // Remove the node file
384     tempOutputFile << "rm \"$LIBBATCH_NODEFILE\"" << endl;
385
386 #ifdef WIN32
387     }
388 #endif
389
390     tempOutputFile.flush();
391     tempOutputFile.close();
392
393     LOG("Batch script file generated is: " << tmpFileName);
394
395     Utils::chmod(tmpFileName.c_str(), 0x1ED);
396     int status = _bm._protocol.copyFile(tmpFileName, "", "",
397                                         remotePath, _bm._hostname, _bm._username);
398     if (status)
399       throw RunTimeException("Cannot copy command file on host " + _bm._hostname);
400
401 #ifdef WIN32
402     if (_bm._protocol.getType() != SH) {
403       // On Windows, we make the remote file executable afterward because
404       // pscp does not preserve access permissions on files
405       string subCommand = string("chmod u+x ") + remotePath;
406       string command = _bm._protocol.getExecCommand(subCommand, _bm._hostname, _bm._username);
407       LOG(command);
408       status = system(command.c_str());
409       if (status) {
410         std::ostringstream oss;
411         oss << "Cannot change permissions of file " << remotePath << " on host " << _bm._hostname;
412         oss << ". Return status is " << status;
413         throw RunTimeException(oss.str());
414       }
415     }
416 #endif
417
418     return remotePath;
419   }
420
421
422
423   // Constructeur de la classe ThreadAdapter
424   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
425   _bm(bm), _job(job), _id(id)
426   {
427     // Nothing to do
428   }
429
430
431
432   // Methode d'execution du thread
433   void * BatchManager_Local::ThreadAdapter::run(void * arg)
434   {
435     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
436
437 #ifndef WIN32
438     // On bloque tous les signaux pour ce thread
439     sigset_t setmask;
440     sigfillset(&setmask);
441     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
442 #endif
443
444     // On autorise la terminaison differee du thread
445     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
446     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
447     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
448
449     // On enregistre la fonction de suppression du fils en cas d'arret du thread
450     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
451     // sera prise en compte par pthread_testcancel()
452     Process child;
453     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
454     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
455     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
456
457     // On forke/exec un nouveau process pour pouvoir controler le fils
458     // (plus finement qu'avec un appel system)
459     // int rc = system(commande.c_str());
460     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
461     //execv("/usr/bin/ssh", parmList);
462 #ifdef WIN32
463     child = p_ta->launchWin32ChildProcess();
464     p_ta->pere(child);
465 #else
466     child = fork();
467     if (child < 0) { // erreur
468       UNDER_LOCK( LOG("Fork impossible (rc=" << child << ")") );
469
470     } else if (child > 0) { // pere
471       p_ta->pere(child);
472
473     } else { // fils
474       p_ta->fils();
475     }
476 #endif
477
478     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
479
480     // Set the job state to FINISHED or FAILED
481     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
482
483     // On retire la fonction de nettoyage de la memoire
484     pthread_cleanup_pop(0);
485
486     // On retire la fonction de suppression du fils
487     pthread_cleanup_pop(0);
488
489     // remove setFailedOnCancel function from cancel stack
490     pthread_cleanup_pop(0);
491
492     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
493
494     // On invoque la fonction de nettoyage de la memoire
495     delete_on_exit(arg);
496
497     UNDER_LOCK( LOG("Father is leaving") );
498     pthread_exit(NULL);
499     return NULL;
500   }
501
502
503
504
505   void BatchManager_Local::ThreadAdapter::pere(Process child)
506   {
507     time_t child_starttime = time(NULL);
508
509     // On enregistre le fils dans la table des threads
510     pthread_t thread_id = pthread_self();
511
512     Parametre param   = _job.getParametre();
513     Environnement env = _job.getEnvironnement();
514
515     ostringstream id_sst;
516     id_sst << _id;
517     param[ID]         = id_sst.str();
518     param[STATE]      = Batch::RUNNING;
519
520     _bm._threads[_id].thread_id = thread_id;
521 #ifndef WIN32
522     _bm._threads[_id].pid       = child;
523 #endif
524     _bm._threads[_id].hasFailed = false;
525     _bm._threads[_id].param     = param;
526     _bm._threads[_id].env       = env;
527     _bm._threads[_id].command_queue.push(NOP);
528
529     // Unlock the master thread. From here, all shared variables must be protected
530     // from concurrent access
531     pthread_cond_signal(&_bm._threadSyncCondition);
532
533
534     // on boucle en attendant que le fils ait termine
535     while (1) {
536 #ifdef WIN32
537       DWORD exitCode;
538       GetExitCodeProcess(child, &exitCode);
539       if (exitCode != STILL_ACTIVE) {
540         UNDER_LOCK( LOG("Father sees his child is DONE: exit code = " << exitCode) );
541         break;
542       }
543 #else
544       int child_rc = 0;
545       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
546       if (child_wait_rc > 0) {
547          UNDER_LOCK( LOG("Status is: " << WIFEXITED( child_rc)) );
548          UNDER_LOCK( LOG("Status is: " << WEXITSTATUS( child_rc)) );
549          UNDER_LOCK( LOG("Status is: " << WIFSIGNALED( child_rc)) );
550          UNDER_LOCK( LOG("Status is: " << WTERMSIG( child_rc)) );
551          UNDER_LOCK( LOG("Status is: " << WCOREDUMP( child_rc)) );
552          UNDER_LOCK( LOG("Status is: " << WIFSTOPPED( child_rc)) );
553          UNDER_LOCK( LOG("Status is: " << WSTOPSIG( child_rc)) );
554 #ifdef WIFCONTINUED
555          UNDER_LOCK( LOG("Status is: " << WIFCONTINUED( child_rc)) ); // not compilable on sarge
556 #endif
557         if (WIFSTOPPED(child_rc)) {
558           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
559           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
560           // desactive car s'il est possible de detecter l'arret d'un process, il est
561           // plus difficile de detecter sa reprise.
562
563           // Le fils est simplement stoppe
564           // @@@ --------> SECTION CRITIQUE <-------- @@@
565           pthread_mutex_lock(&_bm._threads_mutex);
566           _bm._threads[_id].param[STATE] = Batch::PAUSED;
567           pthread_mutex_unlock(&_bm._threads_mutex);
568           // @@@ --------> SECTION CRITIQUE <-------- @@@
569           UNDER_LOCK( LOG("Father sees his child is STOPPED : " << child_wait_rc) );
570
571         }
572         else {
573           // Le fils est termine, on sort de la boucle et du if englobant
574           UNDER_LOCK( LOG("Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")") );
575           break;
576         }
577       }
578       else if (child_wait_rc == -1) {
579         // Le fils a disparu ...
580         // @@@ --------> SECTION CRITIQUE <-------- @@@
581         pthread_mutex_lock(&_bm._threads_mutex);
582         _bm._threads[_id].hasFailed = true;
583         pthread_mutex_unlock(&_bm._threads_mutex);
584         // @@@ --------> SECTION CRITIQUE <-------- @@@
585         UNDER_LOCK( LOG("Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")") );
586         break;
587       }
588 #endif
589
590       // On teste si le thread doit etre detruit
591       pthread_testcancel();
592
593
594
595       // On regarde si le fils n'a pas depasse son temps (wallclock time)
596       time_t child_currenttime = time(NULL);
597       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
598       if (param.find(MAXWALLTIME) != param.end()) {
599         long maxwalltime = param[MAXWALLTIME];
600         //        cout << "child_starttime          = " << child_starttime        << endl
601         //             << "child_currenttime        = " << child_currenttime      << endl
602         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
603         //             << "maxwalltime              = " << maxwalltime            << endl
604         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
605         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
606           UNDER_LOCK( LOG("Father is sending KILL command to the thread " << _id) );
607           // On introduit une commande dans la queue du thread
608           // @@@ --------> SECTION CRITIQUE <-------- @@@
609           pthread_mutex_lock(&_bm._threads_mutex);
610           if (_bm._threads.find(_id) != _bm._threads.end())
611             _bm._threads[_id].command_queue.push(KILL);
612           pthread_mutex_unlock(&_bm._threads_mutex);
613           // @@@ --------> SECTION CRITIQUE <-------- @@@
614
615
616         } else if (child_elapsedtime_minutes > maxwalltime ) {
617           UNDER_LOCK( LOG("Father is sending TERM command to the thread " << _id) );
618           // On introduit une commande dans la queue du thread
619           // @@@ --------> SECTION CRITIQUE <-------- @@@
620           pthread_mutex_lock(&_bm._threads_mutex);
621           if (_bm._threads.find(_id) != _bm._threads.end())
622             _bm._threads[_id].command_queue.push(TERM);
623           pthread_mutex_unlock(&_bm._threads_mutex);
624           // @@@ --------> SECTION CRITIQUE <-------- @@@
625         }
626       }
627
628
629
630       // On regarde s'il y a quelque chose a faire dans la queue de commande
631       // @@@ --------> SECTION CRITIQUE <-------- @@@
632       pthread_mutex_lock(&_bm._threads_mutex);
633       if (_bm._threads.find(_id) != _bm._threads.end()) {
634         while (_bm._threads[_id].command_queue.size() > 0) {
635           Commande cmd = _bm._threads[_id].command_queue.front();
636           _bm._threads[_id].command_queue.pop();
637
638           switch (cmd) {
639     case NOP:
640       UNDER_LOCK( LOG("Father does nothing to his child") );
641       break;
642 #ifndef WIN32
643     case HOLD:
644       UNDER_LOCK( LOG("Father is sending SIGSTOP signal to his child") );
645       kill(child, SIGSTOP);
646       break;
647
648     case RELEASE:
649       UNDER_LOCK( LOG("Father is sending SIGCONT signal to his child") );
650       kill(child, SIGCONT);
651       break;
652
653     case TERM:
654       UNDER_LOCK( LOG("Father is sending SIGTERM signal to his child") );
655       kill(child, SIGTERM);
656       break;
657
658     case KILL:
659       UNDER_LOCK( LOG("Father is sending SIGKILL signal to his child") );
660       kill(child, SIGKILL);
661       break;
662 #endif
663     case ALTER:
664       break;
665
666     default:
667       break;
668           }
669         }
670
671       }
672       pthread_mutex_unlock(&_bm._threads_mutex);
673       // @@@ --------> SECTION CRITIQUE <-------- @@@
674
675       // On fait une petite pause pour ne pas surcharger inutilement le processeur
676 #ifdef WIN32
677       Sleep(1000);
678 #else
679       sleep(1);
680 #endif
681
682     }
683
684   }
685
686
687
688 #ifndef WIN32
689
690   void BatchManager_Local::ThreadAdapter::fils()
691   {
692     Parametre param = _job.getParametre();
693     Parametre::iterator it;
694
695     try {
696
697       // build script file to launch the job and copy it on the server
698       string cmdFilePath = buildCommandFile(_job);
699
700       // define command to submit the job
701       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
702
703       // Build the argument array argv from the command
704       char ** argv = new char * [command.size() + 1];
705       string comstr;
706       for (string::size_type i=0 ; i<command.size() ; i++) {
707         argv[i] = new char[command[i].size() + 1];
708         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
709         comstr += command[i] + " ";
710       }
711       argv[command.size()] = NULL;
712       UNDER_LOCK( LOG("*** debug_command = " << comstr) );
713
714       // On cree une session pour le fils de facon a ce qu'il ne soit pas
715       // detruit lorsque le shell se termine (le shell ouvre une session et
716       // tue tous les process appartenant a la session en quittant)
717       setsid();
718
719       // On ferme les descripteurs de fichiers standards
720       //close(STDIN_FILENO);
721       //close(STDOUT_FILENO);
722       //close(STDERR_FILENO);
723
724       // On execute la commande du fils
725       execv(argv[0], argv);
726       UNDER_LOCK( LOG("*** debug_command = " << strerror(errno)) );
727       // No need to deallocate since nothing happens after a successful exec
728
729       // Normalement on ne devrait jamais arriver ici
730       ofstream file_err("error.log");
731       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
732
733     } catch (GenericException & e) {
734
735       LOG("Caught exception : " << e.type << " : " << e.message);
736     }
737
738     exit(99);
739   }
740
741 #else
742
743   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
744   {
745     Parametre param = _job.getParametre();
746     Parametre::iterator it;
747     PROCESS_INFORMATION pi;
748
749     try {
750
751       // build script file to launch the job and copy it on the server
752       string cmdFilePath = buildCommandFile(_job);
753
754       // define command to submit the job
755       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
756
757       // Build the command string from the command argument vector
758       string comstr;
759       for (unsigned int i=0 ; i<command.size() ; i++) {
760         if (i>0) comstr += " ";
761         comstr += command[i];
762       }
763
764       UNDER_LOCK( LOG("*** debug_command = " << comstr) );
765
766       STARTUPINFO si;
767       ZeroMemory( &si, sizeof(si) );
768       si.cb = sizeof(si);
769       ZeroMemory( &pi, sizeof(pi) );
770
771       // Copy the command to a non-const buffer
772       char * buffer = strdup(comstr.c_str());
773
774       // launch the new process
775       bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
776                                CREATE_NO_WINDOW, NULL, NULL, &si, &pi);
777
778       if (buffer) free(buffer);
779       if (!res) throw RunTimeException("Error while creating new process");
780
781       CloseHandle(pi.hThread);
782
783     } catch (GenericException & e) {
784
785       LOG("Caught exception : " << e.type << " : " << e.message);
786     }
787
788     return pi.hProcess;
789   }
790
791 #endif
792
793
794   void BatchManager_Local::kill_child_on_exit(void * p_pid)
795   {
796 #ifndef WIN32
797     //TODO: porting of following functionality
798     pid_t child = * static_cast<pid_t *>(p_pid);
799
800     // On tue le fils
801     kill(child, SIGTERM);
802
803     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
804     // mais cette option n'est pas implementee pour le moment, car il est
805     // preferable de laisser le process fils se terminer normalement et seul.
806 #endif
807   }
808
809   void BatchManager_Local::delete_on_exit(void * arg)
810   {
811     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
812     delete p_ta;
813   }
814
815   void BatchManager_Local::setFailedOnCancel(void * arg)
816   {
817     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
818     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
819     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
820     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
821
822     // Unlock the master thread. From here, the batch manager instance should not be used.
823     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
824   }
825
826 }