Salome HOME
Merged from BR_AR
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #include "Batch_RunTimeException.hxx"
45 #else
46 #include <sys/wait.h>
47 #include <unistd.h>
48 #endif
49 #include <ctime>
50 #include <pthread.h>
51 #include <signal.h>
52 #include <errno.h>
53 #include <string.h>
54 #include "Batch_IOMutex.hxx"
55 #include "Batch_BatchManager_Local.hxx"
56
57 using namespace std;
58
59 namespace Batch {
60
61
62   // Constructeur
63   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64                                          CommunicationProtocolType protocolType)
65     : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(),
66       _protocol(CommunicationProtocol::getInstance(protocolType)),
67       _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
68   {
69     pthread_mutex_init(&_threads_mutex, NULL);
70     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
71     pthread_cond_init(&_thread_id_id_association_cond, NULL);
72   }
73
74   // Destructeur
75   BatchManager_Local::~BatchManager_Local()
76   {
77     pthread_mutex_destroy(&_threads_mutex);
78     pthread_mutex_destroy(&_thread_id_id_association_mutex);
79     pthread_cond_destroy(&_thread_id_id_association_cond);
80   }
81
82   const CommunicationProtocol & BatchManager_Local::getProtocol() const
83   {
84     return _protocol;
85   }
86
87   // Methode pour le controle des jobs : soumet un job au gestionnaire
88   const JobId BatchManager_Local::submitJob(const Job & job)
89   {
90     Job_Local jobLocal = job;
91
92     pthread_t thread_id = submit(jobLocal);
93
94     ostringstream oss;
95     oss << getIdByThread_id(thread_id);
96
97     JobId id(this, oss.str());
98
99     return id;
100   }
101
102   // Methode pour le controle des jobs : retire un job du gestionnaire
103   void BatchManager_Local::deleteJob(const JobId & jobid)
104   {
105     Id id;
106
107     istringstream iss(jobid.getReference());
108     iss >> id;
109
110     // On retrouve le thread_id du thread
111     pthread_t thread_id;
112
113     // @@@ --------> SECTION CRITIQUE <-------- @@@
114     pthread_mutex_lock(&_threads_mutex);
115     if (_threads.find(id) != _threads.end())
116       thread_id = _threads[id].thread_id;
117     pthread_mutex_unlock(&_threads_mutex);
118     // @@@ --------> SECTION CRITIQUE <-------- @@@
119
120     cancel(thread_id);
121   }
122
123   // Methode pour le controle des jobs : suspend un job en file d'attente
124   void BatchManager_Local::holdJob(const JobId & jobid)
125   {
126     Id id;
127     istringstream iss(jobid.getReference());
128     iss >> id;
129
130     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
131
132     // On introduit une commande dans la queue du thread
133     // @@@ --------> SECTION CRITIQUE <-------- @@@
134     pthread_mutex_lock(&_threads_mutex);
135     if (_threads.find(id) != _threads.end())
136       _threads[id].command_queue.push(HOLD);
137     pthread_mutex_unlock(&_threads_mutex);
138     // @@@ --------> SECTION CRITIQUE <-------- @@@
139   }
140
141   // Methode pour le controle des jobs : relache un job suspendu
142   void BatchManager_Local::releaseJob(const JobId & jobid)
143   {
144     Id id;
145     istringstream iss(jobid.getReference());
146     iss >> id;
147
148     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
149
150     // On introduit une commande dans la queue du thread
151     // @@@ --------> SECTION CRITIQUE <-------- @@@
152     pthread_mutex_lock(&_threads_mutex);
153     if (_threads.find(id) != _threads.end())
154       _threads[id].command_queue.push(RELEASE);
155     pthread_mutex_unlock(&_threads_mutex);
156     // @@@ --------> SECTION CRITIQUE <-------- @@@
157   }
158
159
160   // Methode pour le controle des jobs : modifie un job en file d'attente
161   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
162   {
163   }
164
165   // Methode pour le controle des jobs : modifie un job en file d'attente
166   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
167   {
168     alterJob(jobid, param, Environnement());
169   }
170
171   // Methode pour le controle des jobs : modifie un job en file d'attente
172   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
173   {
174     alterJob(jobid, Parametre(), env);
175   }
176
177
178
179   // Methode pour le controle des jobs : renvoie l'etat du job
180   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
181   {
182     Id id;
183     istringstream iss(jobid.getReference());
184     iss >> id;
185
186     Parametre param;
187     Environnement env;
188
189     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
190     // @@@ --------> SECTION CRITIQUE <-------- @@@
191     pthread_mutex_lock(&_threads_mutex);
192     param = _threads[id].param;
193     env   = _threads[id].env;
194     pthread_mutex_unlock(&_threads_mutex);
195     // @@@ --------> SECTION CRITIQUE <-------- @@@
196     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
197
198     JobInfo_Local ji(param, env);
199     return ji;
200   }
201
202
203
204   // Methode pour le controle des jobs : teste si un job est present en machine
205   bool BatchManager_Local::isRunning(const JobId & jobid)
206   {
207     Id id;
208     istringstream iss(jobid.getReference());
209     iss >> id;
210
211     Status status;
212
213     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
214     // @@@ --------> SECTION CRITIQUE <-------- @@@
215     pthread_mutex_lock(&_threads_mutex);
216     status = _threads[id].status;
217     pthread_mutex_unlock(&_threads_mutex);
218     // @@@ --------> SECTION CRITIQUE <-------- @@@
219     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
220
221     return (status == RUNNING);
222   }
223
224
225   // Methode d'execution d'un job
226   pthread_t BatchManager_Local::submit(const Job_Local & job)
227   {
228     // L'id du thread a creer
229     pthread_t thread_id =
230 #ifdef WIN32
231     {0,0};
232 #else
233       0;
234 #endif
235
236     // Les attributs du thread a sa creation
237     pthread_attr_t thread_attr;
238     pthread_attr_init(&thread_attr);
239     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
240
241     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
242
243     // Creation du thread qui va executer la commande systeme qu'on lui passe
244     int rc = pthread_create(&thread_id,
245       &thread_attr,
246       &ThreadAdapter::run,
247       static_cast<void *>(p_ta));
248     if (rc) {
249     }
250
251     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
252     pthread_attr_destroy(&thread_attr);
253
254     return thread_id;
255   }
256
257
258   // Methode de destruction d'un job
259   void BatchManager_Local::cancel(pthread_t thread_id)
260   {
261     pthread_cancel(thread_id);
262   }
263
264
265   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
266   {
267     ostringstream exec_sub_cmd;
268
269 #ifdef WIN32
270     char drive[_MAX_DRIVE];
271     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
272     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
273 #endif
274
275     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
276
277     if (param.find(ARGUMENTS) != param.end()) {
278       Versatile V = param[ARGUMENTS];
279       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
280         StringType argt = * static_cast<StringType *>(*it);
281         string     arg  = argt;
282         exec_sub_cmd << " " << arg;
283       }
284     }
285
286     string user;
287     Parametre::const_iterator it = param.find(USER);
288     if (it != param.end()) {
289       user = string(it->second);
290     }
291
292     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
293   }
294
295
296   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
297   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
298   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
299   // Thread_id / Id (_thread_id_id_association_mutex)
300   BatchManager_Local::Id BatchManager_Local::nextId()
301   {
302     static Id id = 0;
303     Id nextId = id++;
304     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
305     return nextId;
306   }
307
308
309   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
310   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
311   {
312     // @@@ --------> SECTION CRITIQUE <-------- @@@
313     pthread_mutex_lock(&_thread_id_id_association_mutex);
314     bool threadIdFound = false;
315     std::list<struct ThreadIdIdAssociation>::iterator it;
316     while (!threadIdFound) {
317       for (it = _thread_id_id_association.begin() ;
318            it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
319            it++);
320       if (it == _thread_id_id_association.end())
321         pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
322       else
323         threadIdFound = true;
324     }
325
326     Id id = it->id;
327     _thread_id_id_association.erase(it);
328
329     pthread_mutex_unlock(&_thread_id_id_association_mutex);
330     // @@@ --------> SECTION CRITIQUE <-------- @@@
331
332     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
333     return id;
334   }
335
336
337   // Associe un Thread_id a un Id nouvellement cree
338   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
339   {
340     Id id = -1;
341
342     // @@@ --------> SECTION CRITIQUE <-------- @@@
343     pthread_mutex_lock(&_thread_id_id_association_mutex);
344     std::list<struct ThreadIdIdAssociation>::iterator it;
345     for (it = _thread_id_id_association.begin() ;
346          it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
347          it++);
348     if (it == _thread_id_id_association.end()) {
349       struct ThreadIdIdAssociation newAssociation;
350       id = newAssociation.id = nextId();
351       newAssociation.threadId = thread_id;
352       _thread_id_id_association.push_back(newAssociation);
353       pthread_cond_signal(&_thread_id_id_association_cond);
354
355     } else {
356       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
357     }
358     pthread_mutex_unlock(&_thread_id_id_association_mutex);
359     // @@@ --------> SECTION CRITIQUE <-------- @@@
360
361     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
362     return id;
363   }
364
365
366   // Constructeur de la classe ThreadAdapter
367   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
368   _bm(bm), _job(job)
369   {
370     // Nothing to do
371   }
372
373
374
375   // Methode d'execution du thread
376   void * BatchManager_Local::ThreadAdapter::run(void * arg)
377   {
378 #ifndef WIN32
379     // On bloque tous les signaux pour ce thread
380     sigset_t setmask;
381     sigfillset(&setmask);
382     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
383 #endif
384
385     // On autorise la terminaison differee du thread
386     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
387     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
388     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
389
390     // On enregistre la fonction de suppression du fils en cas d'arret du thread
391     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
392     // sera prise en compte par pthread_testcancel()
393     Process child;
394     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
395     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
396
397     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
398
399
400
401
402     // Le code retour cumule (ORed) de tous les appels
403     // Nul en cas de reussite de l'ensemble des operations
404     int rc = 0;
405
406     // Cette table contient la liste des fichiers a detruire a la fin du processus
407     std::vector<string> files_to_delete;
408
409
410
411     // On copie les fichiers d'entree pour le fils
412     const Parametre param   = p_ta->_job.getParametre();
413     Parametre::const_iterator it;
414
415     // On initialise la variable workdir a la valeur du Current Working Directory
416     char * cwd =
417 #ifdef WIN32
418       _getcwd(NULL, 0);
419 #else
420       new char [PATH_MAX];
421     getcwd(cwd, PATH_MAX);
422 #endif
423     string workdir = cwd;
424     delete [] cwd;
425
426     if ( (it = param.find(WORKDIR)) != param.end() ) {
427       workdir = static_cast<string>( (*it).second );
428     }
429
430     string executionhost = string(param[EXECUTIONHOST]);
431     string user;
432     if ( (it = param.find(USER)) != param.end() ) {
433       user = string(it->second);
434     }
435
436     if ( (it = param.find(INFILE)) != param.end() ) {
437       Versatile V = (*it).second;
438       Versatile::iterator Vit;
439
440       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
441         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
442         Couple cp       = cpt;
443         string local    = cp.getLocal();
444         string remote   = cp.getRemote();
445
446         std::cerr << workdir << std::endl;
447         std::cerr << remote << std::endl;
448
449         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
450                                                                     workdir + "/" + remote,
451                                                                     executionhost, user);
452         if (status) {
453           // Echec de la copie
454           rc |= 1;
455         } else {
456           // On enregistre le fichier comme etant a detruire
457           files_to_delete.push_back(workdir + "/" + remote);
458         }
459
460       }
461     }
462
463
464
465
466     // On forke/exec un nouveau process pour pouvoir controler le fils
467     // (plus finement qu'avec un appel system)
468     // int rc = system(commande.c_str());
469     //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
470     //execv("/usr/bin/ssh", parmList);
471 #ifdef WIN32
472     child = p_ta->launchWin32ChildProcess();
473     p_ta->pere(child);
474 #else
475     child = fork();
476     if (child < 0) { // erreur
477       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
478
479     } else if (child > 0) { // pere
480       p_ta->pere(child);
481
482     } else { // fils
483       p_ta->fils();
484     }
485 #endif
486
487
488     // On copie les fichiers de sortie du fils
489     if ( (it = param.find(OUTFILE)) != param.end() ) {
490       Versatile V = (*it).second;
491       Versatile::iterator Vit;
492
493       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
494         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
495         Couple cp       = cpt;
496         string local    = cp.getLocal();
497         string remote   = cp.getRemote();
498
499         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
500                                                                     executionhost, user,
501                                                                     local, "", "");
502         if (status) {
503           // Echec de la copie
504           rc |= 1;
505         } else {
506           // On enregistre le fichier comme etant a detruire
507           files_to_delete.push_back(workdir + "/" + remote);
508         }
509
510       }
511     }
512
513     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
514     // ou si la creation du fils n'a pu avoir lieu
515     if ( (rc == 0) || (child < 0) ) {
516       std::vector<string>::const_iterator it;
517       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
518         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
519 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
520         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
521 #ifdef WIN32
522         remove_cmd = string("\"") + remove_cmd + string("\"");
523 #endif
524         system(remove_cmd.c_str());*/
525       }
526     }
527
528
529
530     // On retire la fonction de nettoyage de la memoire
531     pthread_cleanup_pop(0);
532
533     // On retire la fonction de suppression du fils
534     pthread_cleanup_pop(0);
535
536
537
538     // On invoque la fonction de nettoyage de la memoire
539     delete_on_exit(arg);
540
541     UNDER_LOCK( cout << "Father is leaving" << endl );
542     pthread_exit(NULL);
543     return NULL;
544   }
545
546
547
548
549   void BatchManager_Local::ThreadAdapter::pere(Process child)
550   {
551     time_t child_starttime = time(NULL);
552
553     // On enregistre le fils dans la table des threads
554     pthread_t thread_id = pthread_self();
555     Id id = _bm.registerThread_id(thread_id);
556
557     Parametre param   = _job.getParametre();
558     Environnement env = _job.getEnvironnement();
559
560     ostringstream thread_id_sst;
561     thread_id_sst << id;
562     param[ID]         = thread_id_sst.str();
563     param[STATE]      = "Running";
564 #ifndef WIN32
565     param[PID]        = child;
566 #endif
567
568     // @@@ --------> SECTION CRITIQUE <-------- @@@
569     pthread_mutex_lock(&_bm._threads_mutex);
570     _bm._threads[id].thread_id = thread_id;
571 #ifndef WIN32
572     _bm._threads[id].pid       = child;
573 #endif
574     _bm._threads[id].status    = RUNNING;
575     _bm._threads[id].param     = param;
576     _bm._threads[id].env       = env;
577     _bm._threads[id].command_queue.push(NOP);
578     pthread_mutex_unlock(&_bm._threads_mutex);
579     // @@@ --------> SECTION CRITIQUE <-------- @@@
580
581
582
583
584     // on boucle en attendant que le fils ait termine
585     while (1) {
586 #ifdef WIN32
587       DWORD exitCode;
588       BOOL res = GetExitCodeProcess(child, &exitCode);
589       if (exitCode != STILL_ACTIVE) {
590         pthread_mutex_lock(&_bm._threads_mutex);
591         _bm._threads[id].status       = DONE;
592         _bm._threads[id].param[STATE] = "Done";
593         pthread_mutex_unlock(&_bm._threads_mutex);
594         // @@@ --------> SECTION CRITIQUE <-------- @@@
595         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
596         break;
597       }
598 #else
599       int child_rc = 0;
600       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
601       if (child_wait_rc > 0) {
602          UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
603          UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
604          UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
605          UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
606          UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
607          UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
608          UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
609          UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl);
610         if (WIFSTOPPED(child_rc)) {
611           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
612           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
613           // desactive car s'il est possible de detecter l'arret d'un process, il est
614           // plus difficile de detecter sa reprise.
615
616           // Le fils est simplement stoppe
617           // @@@ --------> SECTION CRITIQUE <-------- @@@
618           pthread_mutex_lock(&_bm._threads_mutex);
619           _bm._threads[id].status       = STOPPED;
620           _bm._threads[id].param[STATE] = "Stopped";
621           pthread_mutex_unlock(&_bm._threads_mutex);
622           // @@@ --------> SECTION CRITIQUE <-------- @@@
623           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
624
625         }
626         else {
627           // Le fils est termine, on sort de la boucle et du if englobant
628           // @@@ --------> SECTION CRITIQUE <-------- @@@
629           pthread_mutex_lock(&_bm._threads_mutex);
630           _bm._threads[id].status       = DONE;
631           _bm._threads[id].param[STATE] = "Done";
632           pthread_mutex_unlock(&_bm._threads_mutex);
633           // @@@ --------> SECTION CRITIQUE <-------- @@@
634           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
635           break;
636         }
637       }
638       else if (child_wait_rc == -1) {
639         // Le fils a disparu ...
640         // @@@ --------> SECTION CRITIQUE <-------- @@@
641         pthread_mutex_lock(&_bm._threads_mutex);
642         _bm._threads[id].status       = DEAD;
643         _bm._threads[id].param[STATE] = "Dead";
644         pthread_mutex_unlock(&_bm._threads_mutex);
645         // @@@ --------> SECTION CRITIQUE <-------- @@@
646         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
647         break;
648       }
649 #endif
650
651       // On teste si le thread doit etre detruit
652       pthread_testcancel();
653
654
655
656       // On regarde si le fils n'a pas depasse son temps (wallclock time)
657       time_t child_currenttime = time(NULL);
658       time_t child_elapsedtime = child_currenttime - child_starttime;
659       if (param.find(MAXWALLTIME) != param.end()) {
660         int maxwalltime = param[MAXWALLTIME];
661         //        cout << "child_starttime          = " << child_starttime        << endl
662         //             << "child_currenttime        = " << child_currenttime      << endl
663         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
664         //             << "maxwalltime              = " << maxwalltime            << endl
665         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
666         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
667           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
668           // On introduit une commande dans la queue du thread
669           // @@@ --------> SECTION CRITIQUE <-------- @@@
670           pthread_mutex_lock(&_bm._threads_mutex);
671           if (_bm._threads.find(id) != _bm._threads.end())
672             _bm._threads[id].command_queue.push(KILL);
673           pthread_mutex_unlock(&_bm._threads_mutex);
674           // @@@ --------> SECTION CRITIQUE <-------- @@@
675
676
677         } else if (child_elapsedtime > maxwalltime ) {
678           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
679           // On introduit une commande dans la queue du thread
680           // @@@ --------> SECTION CRITIQUE <-------- @@@
681           pthread_mutex_lock(&_bm._threads_mutex);
682           if (_bm._threads.find(id) != _bm._threads.end())
683             _bm._threads[id].command_queue.push(TERM);
684           pthread_mutex_unlock(&_bm._threads_mutex);
685           // @@@ --------> SECTION CRITIQUE <-------- @@@
686         }
687       }
688
689
690
691       // On regarde s'il y a quelque chose a faire dans la queue de commande
692       // @@@ --------> SECTION CRITIQUE <-------- @@@
693       pthread_mutex_lock(&_bm._threads_mutex);
694       if (_bm._threads.find(id) != _bm._threads.end()) {
695         while (_bm._threads[id].command_queue.size() > 0) {
696           Commande cmd = _bm._threads[id].command_queue.front();
697           _bm._threads[id].command_queue.pop();
698
699           switch (cmd) {
700     case NOP:
701       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
702       break;
703 #ifndef WIN32
704     case HOLD:
705       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
706       kill(child, SIGSTOP);
707       break;
708
709     case RELEASE:
710       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
711       kill(child, SIGCONT);
712       break;
713
714     case TERM:
715       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
716       kill(child, SIGTERM);
717       break;
718
719     case KILL:
720       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
721       kill(child, SIGKILL);
722       break;
723 #endif
724     case ALTER:
725       break;
726
727     default:
728       break;
729           }
730         }
731
732       }
733       pthread_mutex_unlock(&_bm._threads_mutex);
734       // @@@ --------> SECTION CRITIQUE <-------- @@@
735
736       // On fait une petite pause pour ne pas surcharger inutilement le processeur
737 #ifdef WIN32
738       Sleep(1000);
739 #else
740       sleep(1);
741 #endif
742
743     }
744
745   }
746
747
748
749 #ifndef WIN32
750
751   void BatchManager_Local::ThreadAdapter::fils()
752   {
753     Parametre param = _job.getParametre();
754     Parametre::iterator it;
755
756       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
757       //int result = execv("/usr/bin/ssh", parmList);
758       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
759       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
760     try {
761
762       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
763       vector<string> command;
764       if (param.find(EXECUTABLE) != param.end()) {
765         command = _bm.exec_command(param);
766       } else exit(1);
767
768       // Build the argument array argv from the command
769       char ** argv = new char * [command.size() + 1];
770       string comstr;
771       for (string::size_type i=0 ; i<command.size() ; i++) {
772         argv[i] = new char[command[i].size() + 1];
773         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
774         if (i>0) comstr += " # ";
775         comstr += command[i];
776       }
777
778       argv[command.size()] = NULL;
779
780       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
781       UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
782
783       // Create the environment for the new process. Note (RB): Here we change the environment for
784       // the process launched in local. It would seem more logical to set the environment for the
785       // remote process.
786       Environnement env = _job.getEnvironnement();
787
788       char ** envp = NULL;
789       if(env.size() > 0) {
790         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
791         int i = 0;
792         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
793           const string  & key   = (*it).first;
794           const string  & value = (*it).second;
795           ostringstream oss;
796           oss << key << "=" << value;
797           envp[i]         = new char [oss.str().size() + 1];
798           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
799         }
800
801         // assert (i == env.size())
802         envp[i] = NULL;
803       }
804
805       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
806       //int result = execv("/usr/bin/ssh", parmList);
807       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
808       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
809
810
811
812       // On positionne les limites systeme imposees au fils
813       if (param.find(MAXCPUTIME) != param.end()) {
814         int maxcputime = param[MAXCPUTIME];
815         struct rlimit limit;
816         limit.rlim_cur = maxcputime;
817         limit.rlim_max = int(maxcputime * 1.1);
818         setrlimit(RLIMIT_CPU, &limit);
819       }
820
821       if (param.find(MAXDISKSIZE) != param.end()) {
822         int maxdisksize = param[MAXDISKSIZE];
823         struct rlimit limit;
824         limit.rlim_cur = maxdisksize * 1024;
825         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
826         setrlimit(RLIMIT_FSIZE, &limit);
827       }
828
829       if (param.find(MAXRAMSIZE) != param.end()) {
830         int maxramsize = param[MAXRAMSIZE];
831         struct rlimit limit;
832         limit.rlim_cur = maxramsize * 1024;
833         limit.rlim_max = int(maxramsize * 1.1) * 1024;
834         setrlimit(RLIMIT_AS, &limit);
835       }
836
837
838       //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
839       //int result = execv("/usr/bin/ssh", parmList);
840       //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
841       //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
842
843       // On cree une session pour le fils de facon a ce qu'il ne soit pas
844       // detruit lorsque le shell se termine (le shell ouvre une session et
845       // tue tous les process appartenant a la session en quittant)
846       setsid();
847
848
849       // On ferme les descripteurs de fichiers standards
850       //close(STDIN_FILENO);
851       //close(STDOUT_FILENO);
852       //close(STDERR_FILENO);
853
854
855       // On execute la commande du fils
856       int result = execve(argv[0], argv, envp);
857       UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
858       // No need to deallocate since nothing happens after a successful exec
859
860       // Normalement on ne devrait jamais arriver ici
861       ofstream file_err("error.log");
862       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
863
864     } catch (GenericException & e) {
865
866       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
867     }
868
869     exit(99);
870   }
871
872 #else
873
874   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
875   {
876     Parametre param = _job.getParametre();
877     Parametre::iterator it;
878     PROCESS_INFORMATION pi;
879
880     try {
881
882       // EXECUTABLE is MANDATORY, if missing, we throw an exception
883       vector<string> exec_command;
884       if (param.find(EXECUTABLE) != param.end()) {
885         exec_command = _bm.exec_command(param);
886       } else {
887         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
888       }
889
890       // Build the command string from the command argument vector
891       string comstr;
892       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
893         if (i>0) comstr += " ";
894         comstr += exec_command[i];
895       }
896
897       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
898
899       // Create the environment for the new process. Note (RB): Here we change the environment for
900       // the process launched in local. It would seem more logical to set the environment for the
901       // remote process.
902       // Note that if no environment is specified, we reuse the current environment.
903       Environnement env = _job.getEnvironnement();
904       char * chNewEnv = NULL;
905
906       if(env.size() > 0) {
907         chNewEnv = new char[4096];
908         LPTSTR lpszCurrentVariable = chNewEnv;
909         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
910           const string  & key   = (*it).first;
911           const string  & value = (*it).second;
912           string envvar = key + "=" + value;
913           envvar.copy(lpszCurrentVariable, envvar.size());
914           lpszCurrentVariable[envvar.size()] = '\0';
915           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
916         }
917         // Terminate the block with a NULL byte.
918         *lpszCurrentVariable = '\0';
919       }
920
921
922       STARTUPINFO si;
923       ZeroMemory( &si, sizeof(si) );
924       si.cb = sizeof(si);
925       ZeroMemory( &pi, sizeof(pi) );
926
927       // Copy the command to a non-const buffer
928       char * buffer = strdup(comstr.c_str());
929
930       // launch the new process
931       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
932                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
933
934       if (buffer) free(buffer);
935       if (!res) throw RunTimeException("Error while creating new process");
936
937       CloseHandle(pi.hThread);
938
939     } catch (GenericException & e) {
940
941       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
942     }
943
944     return pi.hProcess;
945   }
946
947 #endif
948
949
950   void BatchManager_Local::kill_child_on_exit(void * p_pid)
951   {
952 #ifndef WIN32
953     //TODO: porting of following functionality
954     pid_t child = * static_cast<pid_t *>(p_pid);
955
956     // On tue le fils
957     kill(child, SIGTERM);
958
959     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
960     // mais cette option n'est pas implementee pour le moment, car il est
961     // preferable de laisser le process fils se terminer normalement et seul.
962 #endif
963   }
964
965   void BatchManager_Local::delete_on_exit(void * arg)
966   {
967     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
968     delete p_ta;
969   }
970
971 }