Salome HOME
4166e3f80a2e5bcfced8ef3c4ad15a50de93dd5c
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #include "Batch_RunTimeException.hxx"
45 #else
46 #include <sys/wait.h>
47 #include <unistd.h>
48 #endif
49 #include <ctime>
50 #include <pthread.h>
51 #include <signal.h>
52 #include <errno.h>
53 #include <string.h>
54 #include "Batch_IOMutex.hxx"
55 #include "Batch_BatchManager_Local.hxx"
56
57 using namespace std;
58
59 namespace Batch {
60
61
62   // Constructeur
63   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64                                          CommunicationProtocolType protocolType)
65     : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(),
66       _protocol(CommunicationProtocol::getInstance(protocolType)),
67       _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
68   {
69     pthread_mutex_init(&_threads_mutex, NULL);
70     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
71     pthread_cond_init(&_thread_id_id_association_cond, NULL);
72   }
73
74   // Destructeur
75   BatchManager_Local::~BatchManager_Local()
76   {
77     pthread_mutex_destroy(&_threads_mutex);
78     pthread_mutex_destroy(&_thread_id_id_association_mutex);
79     pthread_cond_destroy(&_thread_id_id_association_cond);
80   }
81
82   const CommunicationProtocol & BatchManager_Local::getProtocol() const
83   {
84     return _protocol;
85   }
86
87   // Methode pour le controle des jobs : soumet un job au gestionnaire
88   const JobId BatchManager_Local::submitJob(const Job & job)
89   {
90     Job_Local jobLocal = job;
91
92     pthread_t thread_id = submit(jobLocal);
93
94     ostringstream oss;
95     oss << getIdByThread_id(thread_id);
96
97     JobId id(this, oss.str());
98
99     return id;
100   }
101
102   // Methode pour le controle des jobs : retire un job du gestionnaire
103   void BatchManager_Local::deleteJob(const JobId & jobid)
104   {
105     Id id;
106
107     istringstream iss(jobid.getReference());
108     iss >> id;
109
110     // On retrouve le thread_id du thread
111     pthread_t thread_id;
112
113     // @@@ --------> SECTION CRITIQUE <-------- @@@
114     pthread_mutex_lock(&_threads_mutex);
115     if (_threads.find(id) != _threads.end())
116       thread_id = _threads[id].thread_id;
117     pthread_mutex_unlock(&_threads_mutex);
118     // @@@ --------> SECTION CRITIQUE <-------- @@@
119
120     cancel(thread_id);
121   }
122
123   // Methode pour le controle des jobs : suspend un job en file d'attente
124   void BatchManager_Local::holdJob(const JobId & jobid)
125   {
126     Id id;
127     istringstream iss(jobid.getReference());
128     iss >> id;
129
130     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
131
132     // On introduit une commande dans la queue du thread
133     // @@@ --------> SECTION CRITIQUE <-------- @@@
134     pthread_mutex_lock(&_threads_mutex);
135     if (_threads.find(id) != _threads.end())
136       _threads[id].command_queue.push(HOLD);
137     pthread_mutex_unlock(&_threads_mutex);
138     // @@@ --------> SECTION CRITIQUE <-------- @@@
139   }
140
141   // Methode pour le controle des jobs : relache un job suspendu
142   void BatchManager_Local::releaseJob(const JobId & jobid)
143   {
144     Id id;
145     istringstream iss(jobid.getReference());
146     iss >> id;
147
148     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
149
150     // On introduit une commande dans la queue du thread
151     // @@@ --------> SECTION CRITIQUE <-------- @@@
152     pthread_mutex_lock(&_threads_mutex);
153     if (_threads.find(id) != _threads.end())
154       _threads[id].command_queue.push(RELEASE);
155     pthread_mutex_unlock(&_threads_mutex);
156     // @@@ --------> SECTION CRITIQUE <-------- @@@
157   }
158
159
160   // Methode pour le controle des jobs : modifie un job en file d'attente
161   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
162   {
163   }
164
165   // Methode pour le controle des jobs : modifie un job en file d'attente
166   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
167   {
168     alterJob(jobid, param, Environnement());
169   }
170
171   // Methode pour le controle des jobs : modifie un job en file d'attente
172   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
173   {
174     alterJob(jobid, Parametre(), env);
175   }
176
177
178
179   // Methode pour le controle des jobs : renvoie l'etat du job
180   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
181   {
182     Id id;
183     istringstream iss(jobid.getReference());
184     iss >> id;
185
186     Parametre param;
187     Environnement env;
188
189     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
190     // @@@ --------> SECTION CRITIQUE <-------- @@@
191     pthread_mutex_lock(&_threads_mutex);
192     param = _threads[id].param;
193     env   = _threads[id].env;
194     pthread_mutex_unlock(&_threads_mutex);
195     // @@@ --------> SECTION CRITIQUE <-------- @@@
196     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
197
198     JobInfo_Local ji(param, env);
199     return ji;
200   }
201
202
203
204   // Methode pour le controle des jobs : teste si un job est present en machine
205   bool BatchManager_Local::isRunning(const JobId & jobid)
206   {
207     Id id;
208     istringstream iss(jobid.getReference());
209     iss >> id;
210
211     Status status;
212
213     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
214     // @@@ --------> SECTION CRITIQUE <-------- @@@
215     pthread_mutex_lock(&_threads_mutex);
216     status = _threads[id].status;
217     pthread_mutex_unlock(&_threads_mutex);
218     // @@@ --------> SECTION CRITIQUE <-------- @@@
219     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
220
221     return (status == RUNNING);
222   }
223
224
225   // Methode d'execution d'un job
226   pthread_t BatchManager_Local::submit(const Job_Local & job)
227   {
228     // L'id du thread a creer
229     pthread_t thread_id =
230 #ifdef WIN32
231     {0,0};
232 #else
233       0;
234 #endif
235
236     // Les attributs du thread a sa creation
237     pthread_attr_t thread_attr;
238     pthread_attr_init(&thread_attr);
239     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
240
241     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
242
243     // Creation du thread qui va executer la commande systeme qu'on lui passe
244     int rc = pthread_create(&thread_id,
245       &thread_attr,
246       &ThreadAdapter::run,
247       static_cast<void *>(p_ta));
248     if (rc) {
249     }
250
251     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
252     pthread_attr_destroy(&thread_attr);
253
254     return thread_id;
255   }
256
257
258   // Methode de destruction d'un job
259   void BatchManager_Local::cancel(pthread_t thread_id)
260   {
261     pthread_cancel(thread_id);
262   }
263
264
265   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
266   {
267     ostringstream exec_sub_cmd;
268
269 #ifdef WIN32
270     char drive[_MAX_DRIVE];
271     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
272     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
273 #endif
274
275     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
276
277     if (param.find(ARGUMENTS) != param.end()) {
278       Versatile V = param[ARGUMENTS];
279       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
280         StringType argt = * static_cast<StringType *>(*it);
281         string     arg  = argt;
282         exec_sub_cmd << " " << arg;
283       }
284     }
285
286     string user;
287     Parametre::const_iterator it = param.find(USER);
288     if (it != param.end()) {
289       user = string(it->second);
290     }
291
292     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
293   }
294
295
296   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
297   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
298   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
299   // Thread_id / Id (_thread_id_id_association_mutex)
300   BatchManager_Local::Id BatchManager_Local::nextId()
301   {
302     static Id id = 0;
303     Id nextId = id++;
304     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
305     return nextId;
306   }
307
308
309   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
310   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
311   {
312     // @@@ --------> SECTION CRITIQUE <-------- @@@
313     pthread_mutex_lock(&_thread_id_id_association_mutex);
314     bool threadIdFound = false;
315     std::list<struct ThreadIdIdAssociation>::iterator it;
316     while (!threadIdFound) {
317       for (it = _thread_id_id_association.begin() ;
318            it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
319            it++);
320       if (it == _thread_id_id_association.end())
321         pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
322       else
323         threadIdFound = true;
324     }
325
326     Id id = it->id;
327     _thread_id_id_association.erase(it);
328
329     pthread_mutex_unlock(&_thread_id_id_association_mutex);
330     // @@@ --------> SECTION CRITIQUE <-------- @@@
331
332     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
333     return id;
334   }
335
336
337   // Associe un Thread_id a un Id nouvellement cree
338   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
339   {
340     Id id = -1;
341
342     // @@@ --------> SECTION CRITIQUE <-------- @@@
343     pthread_mutex_lock(&_thread_id_id_association_mutex);
344     std::list<struct ThreadIdIdAssociation>::iterator it;
345     for (it = _thread_id_id_association.begin() ;
346          it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
347          it++);
348     if (it == _thread_id_id_association.end()) {
349       struct ThreadIdIdAssociation newAssociation;
350       id = newAssociation.id = nextId();
351       newAssociation.threadId = thread_id;
352       _thread_id_id_association.push_back(newAssociation);
353       pthread_cond_signal(&_thread_id_id_association_cond);
354
355     } else {
356       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
357     }
358     pthread_mutex_unlock(&_thread_id_id_association_mutex);
359     // @@@ --------> SECTION CRITIQUE <-------- @@@
360
361     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
362     return id;
363   }
364
365
366   // Constructeur de la classe ThreadAdapter
367   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
368   _bm(bm), _job(job)
369   {
370     // Nothing to do
371   }
372
373
374
375   // Methode d'execution du thread
376   void * BatchManager_Local::ThreadAdapter::run(void * arg)
377   {
378 #ifndef WIN32
379     // On bloque tous les signaux pour ce thread
380     sigset_t setmask;
381     sigfillset(&setmask);
382     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
383 #endif
384
385     // On autorise la terminaison differee du thread
386     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
387     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
388     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
389
390     // On enregistre la fonction de suppression du fils en cas d'arret du thread
391     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
392     // sera prise en compte par pthread_testcancel()
393     Process child;
394     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
395     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
396
397     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
398
399
400
401
402     // Le code retour cumule (ORed) de tous les appels
403     // Nul en cas de reussite de l'ensemble des operations
404     int rc = 0;
405
406     // Cette table contient la liste des fichiers a detruire a la fin du processus
407     std::vector<string> files_to_delete;
408
409
410
411     // On copie les fichiers d'entree pour le fils
412     const Parametre param   = p_ta->_job.getParametre();
413     Parametre::const_iterator it;
414
415     // On initialise la variable workdir a la valeur du Current Working Directory
416     char * cwd =
417 #ifdef WIN32
418       _getcwd(NULL, 0);
419 #else
420       new char [PATH_MAX];
421     getcwd(cwd, PATH_MAX);
422 #endif
423     string workdir = cwd;
424     delete [] cwd;
425
426     if ( (it = param.find(WORKDIR)) != param.end() ) {
427       workdir = static_cast<string>( (*it).second );
428     }
429
430     string executionhost = string(param[EXECUTIONHOST]);
431     string user;
432     if ( (it = param.find(USER)) != param.end() ) {
433       user = string(it->second);
434     }
435
436     if ( (it = param.find(INFILE)) != param.end() ) {
437       Versatile V = (*it).second;
438       Versatile::iterator Vit;
439
440       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
441         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
442         Couple cp       = cpt;
443         string local    = cp.getLocal();
444         string remote   = cp.getRemote();
445
446         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
447                                                                     workdir + "/" + remote,
448                                                                     executionhost, user);
449         if (status) {
450           // Echec de la copie
451           rc |= 1;
452         } else {
453           // On enregistre le fichier comme etant a detruire
454           files_to_delete.push_back(workdir + "/" + remote);
455         }
456
457       }
458     }
459
460
461
462
463     // On forke/exec un nouveau process pour pouvoir controler le fils
464     // (plus finement qu'avec un appel system)
465     // int rc = system(commande.c_str());
466 #ifdef WIN32
467     child = p_ta->launchWin32ChildProcess();
468     p_ta->pere(child);
469 #else
470     child = fork();
471     if (child < 0) { // erreur
472       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
473
474     } else if (child > 0) { // pere
475       p_ta->pere(child);
476
477     } else { // fils
478       p_ta->fils();
479     }
480 #endif
481
482
483     // On copie les fichiers de sortie du fils
484     if ( (it = param.find(OUTFILE)) != param.end() ) {
485       Versatile V = (*it).second;
486       Versatile::iterator Vit;
487
488       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
489         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
490         Couple cp       = cpt;
491         string local    = cp.getLocal();
492         string remote   = cp.getRemote();
493
494         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
495                                                                     executionhost, user,
496                                                                     local, "", "");
497         if (status) {
498           // Echec de la copie
499           rc |= 1;
500         } else {
501           // On enregistre le fichier comme etant a detruire
502           files_to_delete.push_back(workdir + "/" + remote);
503         }
504
505       }
506     }
507
508     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
509     // ou si la creation du fils n'a pu avoir lieu
510     if ( (rc == 0) || (child < 0) ) {
511       std::vector<string>::const_iterator it;
512       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
513         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
514 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
515         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
516 #ifdef WIN32
517         remove_cmd = string("\"") + remove_cmd + string("\"");
518 #endif
519         system(remove_cmd.c_str());*/
520       }
521     }
522
523
524
525     // On retire la fonction de nettoyage de la memoire
526     pthread_cleanup_pop(0);
527
528     // On retire la fonction de suppression du fils
529     pthread_cleanup_pop(0);
530
531
532
533     // On invoque la fonction de nettoyage de la memoire
534     delete_on_exit(arg);
535
536     UNDER_LOCK( cout << "Father is leaving" << endl );
537     pthread_exit(NULL);
538     return NULL;
539   }
540
541
542
543
544   void BatchManager_Local::ThreadAdapter::pere(Process child)
545   {
546     time_t child_starttime = time(NULL);
547
548     // On enregistre le fils dans la table des threads
549     pthread_t thread_id = pthread_self();
550     Id id = _bm.registerThread_id(thread_id);
551
552     Parametre param   = _job.getParametre();
553     Environnement env = _job.getEnvironnement();
554
555     ostringstream thread_id_sst;
556     thread_id_sst << id;
557     param[ID]         = thread_id_sst.str();
558     param[STATE]      = "Running";
559 #ifndef WIN32
560     param[PID]        = child;
561 #endif
562
563     // @@@ --------> SECTION CRITIQUE <-------- @@@
564     pthread_mutex_lock(&_bm._threads_mutex);
565     _bm._threads[id].thread_id = thread_id;
566 #ifndef WIN32
567     _bm._threads[id].pid       = child;
568 #endif
569     _bm._threads[id].status    = RUNNING;
570     _bm._threads[id].param     = param;
571     _bm._threads[id].env       = env;
572     _bm._threads[id].command_queue.push(NOP);
573     pthread_mutex_unlock(&_bm._threads_mutex);
574     // @@@ --------> SECTION CRITIQUE <-------- @@@
575
576
577
578
579     // on boucle en attendant que le fils ait termine
580     while (1) {
581 #ifdef WIN32
582       DWORD exitCode;
583       BOOL res = GetExitCodeProcess(child, &exitCode);
584       if (exitCode != STILL_ACTIVE) {
585         pthread_mutex_lock(&_bm._threads_mutex);
586         _bm._threads[id].status       = DONE;
587         _bm._threads[id].param[STATE] = "Done";
588         pthread_mutex_unlock(&_bm._threads_mutex);
589         // @@@ --------> SECTION CRITIQUE <-------- @@@
590         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
591         break;
592       }
593 #else
594       int child_rc = 0;
595       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
596       if (child_wait_rc > 0) {
597         if (WIFSTOPPED(child_rc)) {
598           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
599           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
600           // desactive car s'il est possible de detecter l'arret d'un process, il est
601           // plus difficile de detecter sa reprise.
602
603           // Le fils est simplement stoppe
604           // @@@ --------> SECTION CRITIQUE <-------- @@@
605           pthread_mutex_lock(&_bm._threads_mutex);
606           _bm._threads[id].status       = STOPPED;
607           _bm._threads[id].param[STATE] = "Stopped";
608           pthread_mutex_unlock(&_bm._threads_mutex);
609           // @@@ --------> SECTION CRITIQUE <-------- @@@
610           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
611
612         }
613         else {
614           // Le fils est termine, on sort de la boucle et du if englobant
615           // @@@ --------> SECTION CRITIQUE <-------- @@@
616           pthread_mutex_lock(&_bm._threads_mutex);
617           _bm._threads[id].status       = DONE;
618           _bm._threads[id].param[STATE] = "Done";
619           pthread_mutex_unlock(&_bm._threads_mutex);
620           // @@@ --------> SECTION CRITIQUE <-------- @@@
621           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
622           break;
623         }
624       }
625       else if (child_wait_rc == -1) {
626         // Le fils a disparu ...
627         // @@@ --------> SECTION CRITIQUE <-------- @@@
628         pthread_mutex_lock(&_bm._threads_mutex);
629         _bm._threads[id].status       = DEAD;
630         _bm._threads[id].param[STATE] = "Dead";
631         pthread_mutex_unlock(&_bm._threads_mutex);
632         // @@@ --------> SECTION CRITIQUE <-------- @@@
633         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
634         break;
635       }
636 #endif
637
638       // On teste si le thread doit etre detruit
639       pthread_testcancel();
640
641
642
643       // On regarde si le fils n'a pas depasse son temps (wallclock time)
644       time_t child_currenttime = time(NULL);
645       time_t child_elapsedtime = child_currenttime - child_starttime;
646       if (param.find(MAXWALLTIME) != param.end()) {
647         int maxwalltime = param[MAXWALLTIME];
648         //        cout << "child_starttime          = " << child_starttime        << endl
649         //             << "child_currenttime        = " << child_currenttime      << endl
650         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
651         //             << "maxwalltime              = " << maxwalltime            << endl
652         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
653         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
654           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
655           // On introduit une commande dans la queue du thread
656           // @@@ --------> SECTION CRITIQUE <-------- @@@
657           pthread_mutex_lock(&_bm._threads_mutex);
658           if (_bm._threads.find(id) != _bm._threads.end())
659             _bm._threads[id].command_queue.push(KILL);
660           pthread_mutex_unlock(&_bm._threads_mutex);
661           // @@@ --------> SECTION CRITIQUE <-------- @@@
662
663
664         } else if (child_elapsedtime > maxwalltime ) {
665           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
666           // On introduit une commande dans la queue du thread
667           // @@@ --------> SECTION CRITIQUE <-------- @@@
668           pthread_mutex_lock(&_bm._threads_mutex);
669           if (_bm._threads.find(id) != _bm._threads.end())
670             _bm._threads[id].command_queue.push(TERM);
671           pthread_mutex_unlock(&_bm._threads_mutex);
672           // @@@ --------> SECTION CRITIQUE <-------- @@@
673         }
674       }
675
676
677
678       // On regarde s'il y a quelque chose a faire dans la queue de commande
679       // @@@ --------> SECTION CRITIQUE <-------- @@@
680       pthread_mutex_lock(&_bm._threads_mutex);
681       if (_bm._threads.find(id) != _bm._threads.end()) {
682         while (_bm._threads[id].command_queue.size() > 0) {
683           Commande cmd = _bm._threads[id].command_queue.front();
684           _bm._threads[id].command_queue.pop();
685
686           switch (cmd) {
687     case NOP:
688       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
689       break;
690 #ifndef WIN32
691     case HOLD:
692       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
693       kill(child, SIGSTOP);
694       break;
695
696     case RELEASE:
697       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
698       kill(child, SIGCONT);
699       break;
700
701     case TERM:
702       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
703       kill(child, SIGTERM);
704       break;
705
706     case KILL:
707       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
708       kill(child, SIGKILL);
709       break;
710 #endif
711     case ALTER:
712       break;
713
714     default:
715       break;
716           }
717         }
718
719       }
720       pthread_mutex_unlock(&_bm._threads_mutex);
721       // @@@ --------> SECTION CRITIQUE <-------- @@@
722
723       // On fait une petite pause pour ne pas surcharger inutilement le processeur
724 #ifdef WIN32
725       Sleep(1000);
726 #else
727       sleep(1);
728 #endif
729
730     }
731
732   }
733
734
735
736 #ifndef WIN32
737
738   void BatchManager_Local::ThreadAdapter::fils()
739   {
740     Parametre param = _job.getParametre();
741     Parametre::iterator it;
742
743     try {
744
745       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
746       vector<string> command;
747       if (param.find(EXECUTABLE) != param.end()) {
748         command = _bm.exec_command(param);
749       } else exit(1);
750
751       // Build the argument array argv from the command
752       char ** argv = new char * [command.size() + 1];
753       string comstr;
754       for (string::size_type i=0 ; i<command.size() ; i++) {
755         argv[i] = new char[command[i].size() + 1];
756         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
757         if (i>0) comstr += " # ";
758         comstr += command[i];
759       }
760
761       argv[command.size()] = NULL;
762
763       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
764
765       // Create the environment for the new process. Note (RB): Here we change the environment for
766       // the process launched in local. It would seem more logical to set the environment for the
767       // remote process.
768       Environnement env = _job.getEnvironnement();
769
770       char ** envp = NULL;
771       if(env.size() > 0) {
772         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
773         int i = 0;
774         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
775           const string  & key   = (*it).first;
776           const string  & value = (*it).second;
777           ostringstream oss;
778           oss << key << "=" << value;
779           envp[i]         = new char [oss.str().size() + 1];
780           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
781         }
782
783         // assert (i == env.size())
784         envp[i] = NULL;
785       }
786
787
788
789
790       // On positionne les limites systeme imposees au fils
791       if (param.find(MAXCPUTIME) != param.end()) {
792         int maxcputime = param[MAXCPUTIME];
793         struct rlimit limit;
794         limit.rlim_cur = maxcputime;
795         limit.rlim_max = int(maxcputime * 1.1);
796         setrlimit(RLIMIT_CPU, &limit);
797       }
798
799       if (param.find(MAXDISKSIZE) != param.end()) {
800         int maxdisksize = param[MAXDISKSIZE];
801         struct rlimit limit;
802         limit.rlim_cur = maxdisksize * 1024;
803         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
804         setrlimit(RLIMIT_FSIZE, &limit);
805       }
806
807       if (param.find(MAXRAMSIZE) != param.end()) {
808         int maxramsize = param[MAXRAMSIZE];
809         struct rlimit limit;
810         limit.rlim_cur = maxramsize * 1024;
811         limit.rlim_max = int(maxramsize * 1.1) * 1024;
812         setrlimit(RLIMIT_AS, &limit);
813       }
814
815
816
817       // On cree une session pour le fils de facon a ce qu'il ne soit pas
818       // detruit lorsque le shell se termine (le shell ouvre une session et
819       // tue tous les process appartenant a la session en quittant)
820       setsid();
821
822
823       // On ferme les descripteurs de fichiers standards
824       //close(STDIN_FILENO);
825       //close(STDOUT_FILENO);
826       //close(STDERR_FILENO);
827
828
829       // On execute la commande du fils
830       execve(argv[0], argv, envp);
831
832       // No need to deallocate since nothing happens after a successful exec
833
834       // Normalement on ne devrait jamais arriver ici
835       ofstream file_err("error.log");
836       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
837
838     } catch (GenericException & e) {
839
840       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
841     }
842
843     exit(99);
844   }
845
846 #else
847
848   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
849   {
850     Parametre param = _job.getParametre();
851     Parametre::iterator it;
852     PROCESS_INFORMATION pi;
853
854     try {
855
856       // EXECUTABLE is MANDATORY, if missing, we throw an exception
857       vector<string> exec_command;
858       if (param.find(EXECUTABLE) != param.end()) {
859         exec_command = _bm.exec_command(param);
860       } else {
861         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
862       }
863
864       // Build the command string from the command argument vector
865       string comstr;
866       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
867         if (i>0) comstr += " ";
868         comstr += exec_command[i];
869       }
870
871       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
872
873       // Create the environment for the new process. Note (RB): Here we change the environment for
874       // the process launched in local. It would seem more logical to set the environment for the
875       // remote process.
876       // Note that if no environment is specified, we reuse the current environment.
877       Environnement env = _job.getEnvironnement();
878       char * chNewEnv = NULL;
879
880       if(env.size() > 0) {
881         chNewEnv = new char[4096];
882         LPTSTR lpszCurrentVariable = chNewEnv;
883         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
884           const string  & key   = (*it).first;
885           const string  & value = (*it).second;
886           string envvar = key + "=" + value;
887           envvar.copy(lpszCurrentVariable, envvar.size());
888           lpszCurrentVariable[envvar.size()] = '\0';
889           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
890         }
891         // Terminate the block with a NULL byte.
892         *lpszCurrentVariable = '\0';
893       }
894
895
896       STARTUPINFO si;
897       ZeroMemory( &si, sizeof(si) );
898       si.cb = sizeof(si);
899       ZeroMemory( &pi, sizeof(pi) );
900
901       // Copy the command to a non-const buffer
902       char * buffer = strdup(comstr.c_str());
903
904       // launch the new process
905       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
906                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
907
908       if (buffer) free(buffer);
909       if (!res) throw RunTimeException("Error while creating new process");
910
911       CloseHandle(pi.hThread);
912
913     } catch (GenericException & e) {
914
915       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
916     }
917
918     return pi.hProcess;
919   }
920
921 #endif
922
923
924   void BatchManager_Local::kill_child_on_exit(void * p_pid)
925   {
926 #ifndef WIN32
927     //TODO: porting of following functionality
928     pid_t child = * static_cast<pid_t *>(p_pid);
929
930     // On tue le fils
931     kill(child, SIGTERM);
932
933     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
934     // mais cette option n'est pas implementee pour le moment, car il est
935     // preferable de laisser le process fils se terminer normalement et seul.
936 #endif
937   }
938
939   void BatchManager_Local::delete_on_exit(void * arg)
940   {
941     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
942     delete p_ta;
943   }
944
945 }