Salome HOME
f913ee653f93fdc5d8b99d4222856b739c25ff7c
[tools/libbatch.git] / src / Local / BatchManager_Local.cxx
1 // Copyright (C) 2007-2015  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #else
45 #include <sys/wait.h>
46 #include <unistd.h>
47 #endif
48 #include <ctime>
49 #include <pthread.h>
50 #include <signal.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include "Constants.hxx"
55 #include "IOMutex.hxx"
56 #include "BatchManager_Local.hxx"
57 #include "RunTimeException.hxx"
58 #include "Utils.hxx"
59 #include "Log.hxx"
60
61 using namespace std;
62
63 namespace Batch {
64
65
66   // Constructeur
67   BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
68                                          const char * username,
69                                          CommunicationProtocolType protocolType, const char * mpiImpl)
70     : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
71       _idCounter(0)
72   {
73     pthread_mutex_init(&_threads_mutex, NULL);
74     pthread_cond_init(&_threadSyncCondition, NULL);
75   }
76
77   // Destructeur
78   BatchManager_Local::~BatchManager_Local()
79   {
80     for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
81       pthread_mutex_lock(&_threads_mutex);
82       string state = iter->second.param[STATE];
83       if (state != FINISHED && state != FAILED) {
84         UNDER_LOCK( LOG("Warning: Job " << iter->first <<
85                         " is not finished, it will now be canceled."));
86         pthread_cancel(iter->second.thread_id);
87         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
88       }
89       pthread_mutex_unlock(&_threads_mutex);
90     }
91     pthread_mutex_destroy(&_threads_mutex);
92     pthread_cond_destroy(&_threadSyncCondition);
93   }
94
95   // Methode pour le controle des jobs : soumet un job au gestionnaire
96   const JobId BatchManager_Local::runJob(const Job & job)
97   {
98     Job_Local jobLocal = job;
99     Id id = _idCounter++;
100     ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
101
102     // Les attributs du thread a sa creation
103     pthread_attr_t thread_attr;
104     pthread_attr_init(&thread_attr);
105     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
106
107     // Creation du thread qui va executer la commande systeme qu'on lui passe
108     pthread_t thread_id;
109     pthread_mutex_lock(&_threads_mutex);
110     int rc = pthread_create(&thread_id,
111       &thread_attr,
112       &ThreadAdapter::run,
113       static_cast<void *>(p_ta));
114
115     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
116     pthread_attr_destroy(&thread_attr);
117
118     if (rc != 0) {
119       pthread_mutex_unlock(&_threads_mutex);
120       throw RunTimeException("Can't create new thread in BatchManager_Local");
121     }
122
123     pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
124     pthread_mutex_unlock(&_threads_mutex);
125
126     ostringstream id_sst;
127     id_sst << id;
128     return JobId(this, id_sst.str());
129   }
130
131   // Methode pour le controle des jobs : retire un job du gestionnaire
132   void BatchManager_Local::deleteJob(const JobId & jobid)
133   {
134     Id id;
135
136     istringstream iss(jobid.getReference());
137     iss >> id;
138
139     // @@@ --------> SECTION CRITIQUE <-------- @@@
140     pthread_mutex_lock(&_threads_mutex);
141     bool idFound = (_threads.find(id) != _threads.end());
142     if (idFound) {
143       string state = _threads[id].param[STATE];
144       if (state != FINISHED && state != FAILED) {
145         pthread_cancel(_threads[id].thread_id);
146         pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
147       } else {
148         LOG("Cannot delete job " << jobid.getReference() << ". Job is already finished.");
149       }
150     }
151     pthread_mutex_unlock(&_threads_mutex);
152     // @@@ --------> SECTION CRITIQUE <-------- @@@
153
154     if (!idFound)
155       throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
156   }
157
158   // Methode pour le controle des jobs : suspend un job en file d'attente
159   void BatchManager_Local::holdJob(const JobId & jobid)
160   {
161     Id id;
162     istringstream iss(jobid.getReference());
163     iss >> id;
164
165     UNDER_LOCK( LOG("BatchManager is sending HOLD command to the thread " << id) );
166
167     // On introduit une commande dans la queue du thread
168     // @@@ --------> SECTION CRITIQUE <-------- @@@
169     pthread_mutex_lock(&_threads_mutex);
170     if (_threads.find(id) != _threads.end())
171       _threads[id].command_queue.push(HOLD);
172     pthread_mutex_unlock(&_threads_mutex);
173     // @@@ --------> SECTION CRITIQUE <-------- @@@
174   }
175
176   // Methode pour le controle des jobs : relache un job suspendu
177   void BatchManager_Local::releaseJob(const JobId & jobid)
178   {
179     Id id;
180     istringstream iss(jobid.getReference());
181     iss >> id;
182
183     UNDER_LOCK( LOG("BatchManager is sending RELEASE command to the thread " << id) );
184
185     // On introduit une commande dans la queue du thread
186     // @@@ --------> SECTION CRITIQUE <-------- @@@
187     pthread_mutex_lock(&_threads_mutex);
188     if (_threads.find(id) != _threads.end())
189       _threads[id].command_queue.push(RELEASE);
190     pthread_mutex_unlock(&_threads_mutex);
191     // @@@ --------> SECTION CRITIQUE <-------- @@@
192   }
193
194   // Methode pour le controle des jobs : renvoie l'etat du job
195   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
196   {
197     Id id;
198     istringstream iss(jobid.getReference());
199     iss >> id;
200
201     Parametre param;
202     Environnement env;
203
204     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
205     // @@@ --------> SECTION CRITIQUE <-------- @@@
206     pthread_mutex_lock(&_threads_mutex);
207     std::map<Id, Child >::iterator pos = _threads.find(id);
208     bool found = (pos != _threads.end());
209     if (found) {
210       param = pos->second.param;
211       env   = pos->second.env;
212     }
213     pthread_mutex_unlock(&_threads_mutex);
214     // @@@ --------> SECTION CRITIQUE <-------- @@@
215     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
216
217     if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
218
219     JobInfo_Local ji(param, env);
220     return ji;
221   }
222
223
224   // Ce manager ne peut pas reprendre un job
225   // On force donc l'état du job Ã  erreur - pour cela on ne donne pas d'Id
226   // au JobId
227   const Batch::JobId
228   BatchManager_Local::addJob(const Batch::Job & job, const std::string & reference)
229   {
230     return JobId(this, "undefined");
231   }
232
233   // Methode pour le controle des jobs : teste si un job est present en machine
234   bool BatchManager_Local::isRunning(const JobId & jobid)
235   {
236     Id id;
237     istringstream iss(jobid.getReference());
238     iss >> id;
239
240     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
241     // @@@ --------> SECTION CRITIQUE <-------- @@@
242     pthread_mutex_lock(&_threads_mutex);
243     bool running = (_threads[id].param[STATE].str() == RUNNING);
244     pthread_mutex_unlock(&_threads_mutex);
245     // @@@ --------> SECTION CRITIQUE <-------- @@@
246     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
247
248     return running;
249   }
250
251   string BatchManager_Local::ThreadAdapter::buildCommandFile(const Job_Local & job)
252   {
253     Parametre param = job.getParametre();
254
255     // Mandatory parameters
256     string workDir = "";
257     if (param.find(WORKDIR) != param.end())
258       workDir = param[WORKDIR].str();
259     else
260       throw RunTimeException("param[WORKDIR] is not defined. Please define it, cannot submit this job.");
261     string fileToExecute = "";
262     if (param.find(EXECUTABLE) != param.end())
263       fileToExecute = param[EXECUTABLE].str();
264     else
265       throw RunTimeException("param[EXECUTABLE] is not defined. Please define it, cannot submit this job.");
266
267     string::size_type p1 = fileToExecute.find_last_of("/");
268     string::size_type p2 = fileToExecute.find_last_of(".");
269     string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1);
270     string fileNameToExecute = fileToExecute.substr(p1+1);
271     string remotePath = workDir + "/" + rootNameToExecute + "_launch_job";
272
273     // Create batch submit file
274     ofstream tempOutputFile;
275     string tmpFileName = Utils::createAndOpenTemporaryFile("launch-job", tempOutputFile);
276
277 #ifdef WIN32
278     if (_bm._protocol.getType() == SH) {
279       char drive[_MAX_DRIVE];
280       _splitpath(workDir.c_str(), drive, NULL, NULL, NULL);
281       if (strlen(drive) > 0) tempOutputFile << drive << endl;
282       tempOutputFile << "cd " << Utils::fixPath(workDir) << endl;
283       // Define environment for the job
284       Environnement env = job.getEnvironnement();
285       for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
286         tempOutputFile << "set " << iter->first << "=" << iter->second << endl;
287       }
288       // Launch the executable
289       tempOutputFile << fileNameToExecute;
290       if (param.find(ARGUMENTS) != param.end()) {
291         Versatile V = param[ARGUMENTS];
292         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
293           StringType argt = * static_cast<StringType *>(*it);
294           string     arg  = argt;
295           tempOutputFile << " " << arg;
296         }
297       }
298       remotePath += ".bat";
299     } else {
300 #endif
301
302     tempOutputFile << "#!/bin/sh -f" << endl;
303     tempOutputFile << "cd " << workDir << endl;
304
305     // Optional parameters (system limits on the job process)
306     if (param.find(MAXCPUTIME) != param.end()) {
307       long maxcputime = (long)param[MAXCPUTIME] * 60;
308       tempOutputFile << "ulimit -H -t " << maxcputime << endl;
309     }
310
311     if (param.find(MAXDISKSIZE) != param.end()) {
312       long maxdisksize = (long)param[MAXDISKSIZE] * 1024;
313       tempOutputFile << "ulimit -H -f " << maxdisksize << endl;
314     }
315
316     if (param.find(MAXRAMSIZE) != param.end()) {
317       long maxramsize = (long)param[MAXRAMSIZE] * 1024;
318       tempOutputFile << "ulimit -H -v " << maxramsize << endl;
319     }
320
321     // Number of cores to use
322     int nbproc = 1;
323     if (param.find(NBPROC) != param.end())
324       nbproc = param[NBPROC];
325
326     // Define environment for the job
327     Environnement env = job.getEnvironnement();
328     for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
329       tempOutputFile << "export " << iter->first << "=" << iter->second << endl;
330     }
331
332     // generate nodes file (one line per required proc)
333     tempOutputFile << "LIBBATCH_NODEFILE=$(mktemp nodefile-XXXXXXXXXX)" << endl;
334     tempOutputFile << "i=" << nbproc << endl;
335     tempOutputFile << "hn=$(hostname)" << endl;
336     tempOutputFile << "{" << endl;
337     tempOutputFile << "while [ $i -gt 0 ]" << endl;
338     tempOutputFile << "do" << endl;
339     tempOutputFile << "    echo \"$hn\"" << endl;
340     tempOutputFile << "    i=$((i-1))" << endl;
341     tempOutputFile << "done" << endl;
342     tempOutputFile << "} > \"$LIBBATCH_NODEFILE\"" << endl;
343     tempOutputFile << "export LIBBATCH_NODEFILE" << endl;
344
345     // Launch the executable
346     tempOutputFile << "./" + fileNameToExecute;
347     if (param.find(ARGUMENTS) != param.end()) {
348       Versatile V = param[ARGUMENTS];
349       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
350         StringType argt = * static_cast<StringType *>(*it);
351         string     arg  = argt;
352         tempOutputFile << " " << arg;
353       }
354     }
355
356     // Standard input and output
357     if (param.find(INFILE) != param.end()) {
358       Versatile V = param[INFILE];
359       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
360         Couple cpl = * static_cast<CoupleType*>(*it);
361         string remote = cpl.getRemote();
362         if (remote == "stdin")
363           tempOutputFile << " <stdin";
364       }
365     }
366
367     string stdoutFile = workDir + "/logs/output.log." + rootNameToExecute;
368     string stderrFile = workDir + "/logs/error.log." + rootNameToExecute;
369     if (param.find(OUTFILE) != param.end()) {
370       Versatile V = param[OUTFILE];
371       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
372         Couple cpl = * static_cast<CoupleType*>(*it);
373         string remote = cpl.getRemote();
374         if (remote == "stdout") stdoutFile = "stdout";
375         if (remote == "stderr") stderrFile = "stderr";
376       }
377     }
378     tempOutputFile << " 1>" << stdoutFile << " 2>" << stderrFile << endl;
379
380     // Remove the node file
381     tempOutputFile << "rm \"$LIBBATCH_NODEFILE\"" << endl;
382
383 #ifdef WIN32
384     }
385 #endif
386
387     tempOutputFile.flush();
388     tempOutputFile.close();
389
390     LOG("Batch script file generated is: " << tmpFileName);
391
392     Utils::chmod(tmpFileName.c_str(), 0x1ED);
393     int status = _bm._protocol.copyFile(tmpFileName, "", "",
394                                         remotePath, _bm._hostname, _bm._username);
395     if (status)
396       throw RunTimeException("Cannot copy command file on host " + _bm._hostname);
397
398 #ifdef WIN32
399     if (_bm._protocol.getType() != SH) {
400       // On Windows, we make the remote file executable afterward because
401       // pscp does not preserve access permissions on files
402       string subCommand = string("chmod u+x ") + remotePath;
403       string command = _bm._protocol.getExecCommand(subCommand, _bm._hostname, _bm._username);
404       LOG(command);
405       status = system(command.c_str());
406       if (status) {
407         std::ostringstream oss;
408         oss << "Cannot change permissions of file " << remotePath << " on host " << _bm._hostname;
409         oss << ". Return status is " << status;
410         throw RunTimeException(oss.str());
411       }
412     }
413 #endif
414
415     return remotePath;
416   }
417
418
419
420   // Constructeur de la classe ThreadAdapter
421   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
422   _bm(bm), _job(job), _id(id)
423   {
424     // Nothing to do
425   }
426
427
428
429   // Methode d'execution du thread
430   void * BatchManager_Local::ThreadAdapter::run(void * arg)
431   {
432     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
433
434 #ifndef WIN32
435     // On bloque tous les signaux pour ce thread
436     sigset_t setmask;
437     sigfillset(&setmask);
438     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
439 #endif
440
441     // On autorise la terminaison differee du thread
442     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
443     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
444     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
445
446     // On enregistre la fonction de suppression du fils en cas d'arret du thread
447     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
448     // sera prise en compte par pthread_testcancel()
449     Process child;
450     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
451     pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
452     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
453
454     // On forke/exec un nouveau process pour pouvoir controler le fils
455     // (plus finement qu'avec un appel system)
456     // int rc = system(commande.c_str());
457     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
458     //execv("/usr/bin/ssh", parmList);
459 #ifdef WIN32
460     child = p_ta->launchWin32ChildProcess();
461     p_ta->pere(child);
462 #else
463     child = fork();
464     if (child < 0) { // erreur
465       UNDER_LOCK( LOG("Fork impossible (rc=" << child << ")") );
466
467     } else if (child > 0) { // pere
468       p_ta->pere(child);
469
470     } else { // fils
471       p_ta->fils();
472     }
473 #endif
474
475     pthread_mutex_lock(&p_ta->_bm._threads_mutex);
476
477     // Set the job state to FINISHED or FAILED
478     p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
479
480     // On retire la fonction de nettoyage de la memoire
481     pthread_cleanup_pop(0);
482
483     // On retire la fonction de suppression du fils
484     pthread_cleanup_pop(0);
485
486     // remove setFailedOnCancel function from cancel stack
487     pthread_cleanup_pop(0);
488
489     pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
490
491     // On invoque la fonction de nettoyage de la memoire
492     delete_on_exit(arg);
493
494     UNDER_LOCK( LOG("Father is leaving") );
495     pthread_exit(NULL);
496     return NULL;
497   }
498
499
500
501
502   void BatchManager_Local::ThreadAdapter::pere(Process child)
503   {
504     time_t child_starttime = time(NULL);
505
506     // On enregistre le fils dans la table des threads
507     pthread_t thread_id = pthread_self();
508
509     Parametre param   = _job.getParametre();
510     Environnement env = _job.getEnvironnement();
511
512     ostringstream id_sst;
513     id_sst << _id;
514     param[ID]         = id_sst.str();
515     param[STATE]      = Batch::RUNNING;
516
517     _bm._threads[_id].thread_id = thread_id;
518 #ifndef WIN32
519     _bm._threads[_id].pid       = child;
520 #endif
521     _bm._threads[_id].hasFailed = false;
522     _bm._threads[_id].param     = param;
523     _bm._threads[_id].env       = env;
524     _bm._threads[_id].command_queue.push(NOP);
525
526     // Unlock the master thread. From here, all shared variables must be protected
527     // from concurrent access
528     pthread_cond_signal(&_bm._threadSyncCondition);
529
530
531     // on boucle en attendant que le fils ait termine
532     while (1) {
533 #ifdef WIN32
534       DWORD exitCode;
535       GetExitCodeProcess(child, &exitCode);
536       if (exitCode != STILL_ACTIVE) {
537         UNDER_LOCK( LOG("Father sees his child is DONE: exit code = " << exitCode) );
538         break;
539       }
540 #else
541       int child_rc = 0;
542       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
543       if (child_wait_rc > 0) {
544          UNDER_LOCK( LOG("Status is: " << WIFEXITED( child_rc)) );
545          UNDER_LOCK( LOG("Status is: " << WEXITSTATUS( child_rc)) );
546          UNDER_LOCK( LOG("Status is: " << WIFSIGNALED( child_rc)) );
547          UNDER_LOCK( LOG("Status is: " << WTERMSIG( child_rc)) );
548          UNDER_LOCK( LOG("Status is: " << WCOREDUMP( child_rc)) );
549          UNDER_LOCK( LOG("Status is: " << WIFSTOPPED( child_rc)) );
550          UNDER_LOCK( LOG("Status is: " << WSTOPSIG( child_rc)) );
551 #ifdef WIFCONTINUED
552          UNDER_LOCK( LOG("Status is: " << WIFCONTINUED( child_rc)) ); // not compilable on sarge
553 #endif
554         if (WIFSTOPPED(child_rc)) {
555           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
556           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
557           // desactive car s'il est possible de detecter l'arret d'un process, il est
558           // plus difficile de detecter sa reprise.
559
560           // Le fils est simplement stoppe
561           // @@@ --------> SECTION CRITIQUE <-------- @@@
562           pthread_mutex_lock(&_bm._threads_mutex);
563           _bm._threads[_id].param[STATE] = Batch::PAUSED;
564           pthread_mutex_unlock(&_bm._threads_mutex);
565           // @@@ --------> SECTION CRITIQUE <-------- @@@
566           UNDER_LOCK( LOG("Father sees his child is STOPPED : " << child_wait_rc) );
567
568         }
569         else {
570           // Le fils est termine, on sort de la boucle et du if englobant
571           UNDER_LOCK( LOG("Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")") );
572           break;
573         }
574       }
575       else if (child_wait_rc == -1) {
576         // Le fils a disparu ...
577         // @@@ --------> SECTION CRITIQUE <-------- @@@
578         pthread_mutex_lock(&_bm._threads_mutex);
579         _bm._threads[_id].hasFailed = true;
580         pthread_mutex_unlock(&_bm._threads_mutex);
581         // @@@ --------> SECTION CRITIQUE <-------- @@@
582         UNDER_LOCK( LOG("Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")") );
583         break;
584       }
585 #endif
586
587       // On teste si le thread doit etre detruit
588       pthread_testcancel();
589
590
591
592       // On regarde si le fils n'a pas depasse son temps (wallclock time)
593       time_t child_currenttime = time(NULL);
594       long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
595       if (param.find(MAXWALLTIME) != param.end()) {
596         long maxwalltime = param[MAXWALLTIME];
597         //        cout << "child_starttime          = " << child_starttime        << endl
598         //             << "child_currenttime        = " << child_currenttime      << endl
599         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
600         //             << "maxwalltime              = " << maxwalltime            << endl
601         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
602         if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
603           UNDER_LOCK( LOG("Father is sending KILL command to the thread " << _id) );
604           // On introduit une commande dans la queue du thread
605           // @@@ --------> SECTION CRITIQUE <-------- @@@
606           pthread_mutex_lock(&_bm._threads_mutex);
607           if (_bm._threads.find(_id) != _bm._threads.end())
608             _bm._threads[_id].command_queue.push(KILL);
609           pthread_mutex_unlock(&_bm._threads_mutex);
610           // @@@ --------> SECTION CRITIQUE <-------- @@@
611
612
613         } else if (child_elapsedtime_minutes > maxwalltime ) {
614           UNDER_LOCK( LOG("Father is sending TERM command to the thread " << _id) );
615           // On introduit une commande dans la queue du thread
616           // @@@ --------> SECTION CRITIQUE <-------- @@@
617           pthread_mutex_lock(&_bm._threads_mutex);
618           if (_bm._threads.find(_id) != _bm._threads.end())
619             _bm._threads[_id].command_queue.push(TERM);
620           pthread_mutex_unlock(&_bm._threads_mutex);
621           // @@@ --------> SECTION CRITIQUE <-------- @@@
622         }
623       }
624
625
626
627       // On regarde s'il y a quelque chose a faire dans la queue de commande
628       // @@@ --------> SECTION CRITIQUE <-------- @@@
629       pthread_mutex_lock(&_bm._threads_mutex);
630       if (_bm._threads.find(_id) != _bm._threads.end()) {
631         while (_bm._threads[_id].command_queue.size() > 0) {
632           Commande cmd = _bm._threads[_id].command_queue.front();
633           _bm._threads[_id].command_queue.pop();
634
635           switch (cmd) {
636     case NOP:
637       UNDER_LOCK( LOG("Father does nothing to his child") );
638       break;
639 #ifndef WIN32
640     case HOLD:
641       UNDER_LOCK( LOG("Father is sending SIGSTOP signal to his child") );
642       kill(child, SIGSTOP);
643       break;
644
645     case RELEASE:
646       UNDER_LOCK( LOG("Father is sending SIGCONT signal to his child") );
647       kill(child, SIGCONT);
648       break;
649
650     case TERM:
651       UNDER_LOCK( LOG("Father is sending SIGTERM signal to his child") );
652       kill(child, SIGTERM);
653       break;
654
655     case KILL:
656       UNDER_LOCK( LOG("Father is sending SIGKILL signal to his child") );
657       kill(child, SIGKILL);
658       break;
659 #endif
660     case ALTER:
661       break;
662
663     default:
664       break;
665           }
666         }
667
668       }
669       pthread_mutex_unlock(&_bm._threads_mutex);
670       // @@@ --------> SECTION CRITIQUE <-------- @@@
671
672       // On fait une petite pause pour ne pas surcharger inutilement le processeur
673 #ifdef WIN32
674       Sleep(1000);
675 #else
676       sleep(1);
677 #endif
678
679     }
680
681   }
682
683
684
685 #ifndef WIN32
686
687   void BatchManager_Local::ThreadAdapter::fils()
688   {
689     Parametre param = _job.getParametre();
690     Parametre::iterator it;
691
692     try {
693
694       // build script file to launch the job and copy it on the server
695       string cmdFilePath = buildCommandFile(_job);
696
697       // define command to submit the job
698       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
699
700       // Build the argument array argv from the command
701       char ** argv = new char * [command.size() + 1];
702       string comstr;
703       for (string::size_type i=0 ; i<command.size() ; i++) {
704         argv[i] = new char[command[i].size() + 1];
705         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
706         comstr += command[i] + " ";
707       }
708       argv[command.size()] = NULL;
709       UNDER_LOCK( LOG("*** debug_command = " << comstr) );
710
711       // On cree une session pour le fils de facon a ce qu'il ne soit pas
712       // detruit lorsque le shell se termine (le shell ouvre une session et
713       // tue tous les process appartenant a la session en quittant)
714       setsid();
715
716       // On ferme les descripteurs de fichiers standards
717       //close(STDIN_FILENO);
718       //close(STDOUT_FILENO);
719       //close(STDERR_FILENO);
720
721       // On execute la commande du fils
722       execv(argv[0], argv);
723       UNDER_LOCK( LOG("*** debug_command = " << strerror(errno)) );
724       // No need to deallocate since nothing happens after a successful exec
725
726       // Normalement on ne devrait jamais arriver ici
727       ofstream file_err("error.log");
728       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
729
730     } catch (GenericException & e) {
731
732       LOG("Caught exception : " << e.type << " : " << e.message);
733     }
734
735     exit(99);
736   }
737
738 #else
739
740   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
741   {
742     Parametre param = _job.getParametre();
743     Parametre::iterator it;
744     PROCESS_INFORMATION pi;
745
746     try {
747
748       // build script file to launch the job and copy it on the server
749       string cmdFilePath = buildCommandFile(_job);
750
751       // define command to submit the job
752       vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
753
754       // Build the command string from the command argument vector
755       string comstr;
756       for (unsigned int i=0 ; i<command.size() ; i++) {
757         if (i>0) comstr += " ";
758         comstr += command[i];
759       }
760
761       UNDER_LOCK( LOG("*** debug_command = " << comstr) );
762
763       STARTUPINFO si;
764       ZeroMemory( &si, sizeof(si) );
765       si.cb = sizeof(si);
766       ZeroMemory( &pi, sizeof(pi) );
767
768       // Copy the command to a non-const buffer
769       char * buffer = strdup(comstr.c_str());
770
771       // launch the new process
772       bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
773                                CREATE_NO_WINDOW, NULL, NULL, &si, &pi);
774
775       if (buffer) free(buffer);
776       if (!res) throw RunTimeException("Error while creating new process");
777
778       CloseHandle(pi.hThread);
779
780     } catch (GenericException & e) {
781
782       LOG("Caught exception : " << e.type << " : " << e.message);
783     }
784
785     return pi.hProcess;
786   }
787
788 #endif
789
790
791   void BatchManager_Local::kill_child_on_exit(void * p_pid)
792   {
793 #ifndef WIN32
794     //TODO: porting of following functionality
795     pid_t child = * static_cast<pid_t *>(p_pid);
796
797     // On tue le fils
798     kill(child, SIGTERM);
799
800     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
801     // mais cette option n'est pas implementee pour le moment, car il est
802     // preferable de laisser le process fils se terminer normalement et seul.
803 #endif
804   }
805
806   void BatchManager_Local::delete_on_exit(void * arg)
807   {
808     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
809     delete p_ta;
810   }
811
812   void BatchManager_Local::setFailedOnCancel(void * arg)
813   {
814     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
815     pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
816     p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
817     pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
818
819     // Unlock the master thread. From here, the batch manager instance should not be used.
820     pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
821   }
822
823 }