Salome HOME
Added support for Microsoft Visual C++ (also experimental)
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 */
31
32 #include <iostream>
33 #include <fstream>
34 #include <sstream>
35 #include <cstdlib>
36 #include <limits.h>
37
38 #include <sys/types.h>
39 #ifdef WIN32
40 # include <direct.h>
41 #include "Batch_RunTimeException.hxx"
42 #else
43 # include <sys/wait.h>
44 # include <unistd.h>
45 #endif
46 #include <ctime>
47 #include <pthread.h>
48 #include <signal.h>
49 #include <errno.h>
50 #include <string.h>
51 #include "Batch_IOMutex.hxx"
52 #include "Batch_BatchManager_Local.hxx"
53
54 using namespace std;
55
56 namespace Batch {
57
58
59   // Constructeur
60   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host)
61       throw(InvalidArgumentException,ConnexionFailureException)
62     : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), _thread_id_id_association_mutex(),
63       _thread_id_id_association_cond(), _thread_id_id_association()
64   {
65     pthread_mutex_init(&_threads_mutex, NULL);
66     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
67     pthread_cond_init(&_thread_id_id_association_cond, NULL);
68   }
69
70   // Destructeur
71   BatchManager_Local::~BatchManager_Local()
72   {
73     pthread_mutex_destroy(&_threads_mutex);
74     pthread_mutex_destroy(&_thread_id_id_association_mutex);
75     pthread_cond_destroy(&_thread_id_id_association_cond);
76   }
77
78   // Methode pour le controle des jobs : soumet un job au gestionnaire
79   const JobId BatchManager_Local::submitJob(const Job & job)
80   {
81     Job_Local jobLocal = job;
82
83     pthread_t thread_id = submit(jobLocal);
84
85     ostringstream oss;
86     oss << getIdByThread_id(thread_id);
87
88     JobId id(this, oss.str());
89
90     return id;
91   }
92
93   // Methode pour le controle des jobs : retire un job du gestionnaire
94   void BatchManager_Local::deleteJob(const JobId & jobid)
95   {
96     Id id;
97
98     istringstream iss(jobid.getReference());
99     iss >> id;
100
101     // On retrouve le thread_id du thread
102     pthread_t thread_id;
103
104     // @@@ --------> SECTION CRITIQUE <-------- @@@
105     pthread_mutex_lock(&_threads_mutex);
106     if (_threads.find(id) != _threads.end())
107       thread_id = _threads[id].thread_id;
108     pthread_mutex_unlock(&_threads_mutex);
109     // @@@ --------> SECTION CRITIQUE <-------- @@@
110
111     cancel(thread_id);
112   }
113
114   // Methode pour le controle des jobs : suspend un job en file d'attente
115   void BatchManager_Local::holdJob(const JobId & jobid)
116   {
117     Id id;
118     istringstream iss(jobid.getReference());
119     iss >> id;
120
121     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
122
123     // On introduit une commande dans la queue du thread
124     // @@@ --------> SECTION CRITIQUE <-------- @@@
125     pthread_mutex_lock(&_threads_mutex);
126     if (_threads.find(id) != _threads.end())
127       _threads[id].command_queue.push(HOLD);
128     pthread_mutex_unlock(&_threads_mutex);
129     // @@@ --------> SECTION CRITIQUE <-------- @@@
130   }
131
132   // Methode pour le controle des jobs : relache un job suspendu
133   void BatchManager_Local::releaseJob(const JobId & jobid)
134   {
135     Id id;
136     istringstream iss(jobid.getReference());
137     iss >> id;
138
139     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
140
141     // On introduit une commande dans la queue du thread
142     // @@@ --------> SECTION CRITIQUE <-------- @@@
143     pthread_mutex_lock(&_threads_mutex);
144     if (_threads.find(id) != _threads.end())
145       _threads[id].command_queue.push(RELEASE);
146     pthread_mutex_unlock(&_threads_mutex);
147     // @@@ --------> SECTION CRITIQUE <-------- @@@
148   }
149
150
151   // Methode pour le controle des jobs : modifie un job en file d'attente
152   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
153   {
154   }
155
156   // Methode pour le controle des jobs : modifie un job en file d'attente
157   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
158   {
159     alterJob(jobid, param, Environnement());
160   }
161
162   // Methode pour le controle des jobs : modifie un job en file d'attente
163   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
164   {
165     alterJob(jobid, Parametre(), env);
166   }
167
168
169
170   // Methode pour le controle des jobs : renvoie l'etat du job
171   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
172   {
173     Id id;
174     istringstream iss(jobid.getReference());
175     iss >> id;
176
177     Parametre param;
178     Environnement env;
179
180     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
181     // @@@ --------> SECTION CRITIQUE <-------- @@@
182     pthread_mutex_lock(&_threads_mutex);
183     param = _threads[id].param;
184     env   = _threads[id].env;
185     pthread_mutex_unlock(&_threads_mutex);
186     // @@@ --------> SECTION CRITIQUE <-------- @@@
187     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
188
189     JobInfo_Local ji(param, env);
190     return ji;
191   }
192
193
194
195   // Methode pour le controle des jobs : teste si un job est present en machine
196   bool BatchManager_Local::isRunning(const JobId & jobid)
197   {
198     Id id;
199     istringstream iss(jobid.getReference());
200     iss >> id;
201
202     Status status;
203
204     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
205     // @@@ --------> SECTION CRITIQUE <-------- @@@
206     pthread_mutex_lock(&_threads_mutex);
207     status = _threads[id].status;
208     pthread_mutex_unlock(&_threads_mutex);
209     // @@@ --------> SECTION CRITIQUE <-------- @@@
210     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
211
212     return (status == RUNNING);
213   }
214
215
216   // Methode d'execution d'un job
217   pthread_t BatchManager_Local::submit(const Job_Local & job)
218   {
219     // L'id du thread a creer
220     pthread_t thread_id =
221 #ifdef WIN32
222     {0,0};
223 #else
224       0;
225 #endif
226
227     // Les attributs du thread a sa creation
228     pthread_attr_t thread_attr;
229     pthread_attr_init(&thread_attr);
230     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
231
232     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
233
234     // Creation du thread qui va executer la commande systeme qu'on lui passe
235     int rc = pthread_create(&thread_id,
236       &thread_attr,
237       &ThreadAdapter::run,
238       static_cast<void *>(p_ta));
239     if (rc) {
240     }
241
242     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
243     pthread_attr_destroy(&thread_attr);
244
245     return thread_id;
246   }
247
248
249   // Methode de destruction d'un job
250   void BatchManager_Local::cancel(pthread_t thread_id)
251   {
252     pthread_cancel(thread_id);
253   }
254
255
256   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
257   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
258   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
259   // Thread_id / Id (_thread_id_id_association_mutex)
260   BatchManager_Local::Id BatchManager_Local::nextId()
261   {
262     static Id id = 0;
263     Id nextId = id++;
264     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
265     return nextId;
266   }
267
268
269   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
270   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
271   {
272     // @@@ --------> SECTION CRITIQUE <-------- @@@
273     pthread_mutex_lock(&_thread_id_id_association_mutex);
274     bool threadIdFound = false;
275     std::list<struct ThreadIdIdAssociation>::iterator it;
276     while (!threadIdFound) {
277       for (it = _thread_id_id_association.begin() ;
278            it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
279            it++);
280       if (it == _thread_id_id_association.end())
281         pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
282       else
283         threadIdFound = true;
284     }
285
286     Id id = it->id;
287     _thread_id_id_association.erase(it);
288
289     pthread_mutex_unlock(&_thread_id_id_association_mutex);
290     // @@@ --------> SECTION CRITIQUE <-------- @@@
291
292     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
293     return id;
294   }
295
296
297   // Associe un Thread_id a un Id nouvellement cree
298   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
299   {
300     Id id = -1;
301
302     // @@@ --------> SECTION CRITIQUE <-------- @@@
303     pthread_mutex_lock(&_thread_id_id_association_mutex);
304     std::list<struct ThreadIdIdAssociation>::iterator it;
305     for (it = _thread_id_id_association.begin() ;
306          it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
307          it++);
308     if (it == _thread_id_id_association.end()) {
309       struct ThreadIdIdAssociation newAssociation;
310       id = newAssociation.id = nextId();
311       newAssociation.threadId = thread_id;
312       _thread_id_id_association.push_back(newAssociation);
313       pthread_cond_signal(&_thread_id_id_association_cond);
314
315     } else {
316       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
317     }
318     pthread_mutex_unlock(&_thread_id_id_association_mutex);
319     // @@@ --------> SECTION CRITIQUE <-------- @@@
320
321     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
322     return id;
323   }
324
325
326   // Constructeur de la classe ThreadAdapter
327   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
328   _bm(bm), _job(job)
329   {
330     // Nothing to do
331   }
332
333
334
335   // Methode d'execution du thread
336   void * BatchManager_Local::ThreadAdapter::run(void * arg)
337   {
338 #ifndef WIN32
339     // On bloque tous les signaux pour ce thread
340     sigset_t setmask;
341     sigfillset(&setmask);
342     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
343 #endif
344
345     // On autorise la terminaison differee du thread
346     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
347     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
348     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
349
350     // On enregistre la fonction de suppression du fils en cas d'arret du thread
351     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
352     // sera prise en compte par pthread_testcancel()
353     Process child;
354     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
355     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
356
357     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
358
359
360
361
362     // Le code retour cumule (ORed) de tous les appels
363     // Nul en cas de reussite de l'ensemble des operations
364     int rc = 0;
365
366     // Cette table contient la liste des fichiers a detruire a la fin du processus
367     std::vector<string> files_to_delete;
368
369
370
371     // On copie les fichiers d'entree pour le fils
372     const Parametre param   = p_ta->_job.getParametre();
373     Parametre::const_iterator it;
374
375     // On initialise la variable workdir a la valeur du Current Working Directory
376     char * cwd =
377 #ifdef WIN32
378       _getcwd(NULL, 0);
379 #else
380       new char [PATH_MAX];
381     getcwd(cwd, PATH_MAX);
382 #endif
383     string workdir = cwd;
384     delete [] cwd;
385
386     if ( (it = param.find(WORKDIR)) != param.end() ) {
387       workdir = static_cast<string>( (*it).second );
388     }
389
390     string executionhost = string(param[EXECUTIONHOST]);
391     string user;
392     if ( (it = param.find(USER)) != param.end() ) {
393       user = string(it->second);
394     }
395
396     if ( (it = param.find(INFILE)) != param.end() ) {
397       Versatile V = (*it).second;
398       Versatile::iterator Vit;
399
400       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
401         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
402         Couple cp       = cpt;
403         string local    = cp.getLocal();
404         string remote   = cp.getRemote();
405
406         string copy_cmd = p_ta->getBatchManager().copy_command("", "", local, user,
407                                                                executionhost, workdir + "/" + remote);
408         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
409 #ifdef WIN32
410         copy_cmd = string("\"") + copy_cmd + string("\"");
411 #endif
412
413         if (system(copy_cmd.c_str()) ) {
414           // Echec de la copie
415           rc |= 1;
416         } else {
417           // On enregistre le fichier comme etant a detruire
418           files_to_delete.push_back(workdir + "/" + remote);
419         }
420
421       }
422     }
423
424
425
426
427     // On forke/exec un nouveau process pour pouvoir controler le fils
428     // (plus finement qu'avec un appel system)
429     // int rc = system(commande.c_str());
430 #ifdef WIN32
431     child = p_ta->launchWin32ChildProcess();
432     p_ta->pere(child);
433 #else
434     child = fork();
435     if (child < 0) { // erreur
436       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
437
438     } else if (child > 0) { // pere
439       p_ta->pere(child);
440
441     } else { // fils
442       p_ta->fils();
443     }
444 #endif
445
446
447     // On copie les fichiers de sortie du fils
448     if ( (it = param.find(OUTFILE)) != param.end() ) {
449       Versatile V = (*it).second;
450       Versatile::iterator Vit;
451
452       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
453         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
454         Couple cp       = cpt;
455         string local    = cp.getLocal();
456         string remote   = cp.getRemote();
457
458         string copy_cmd = p_ta->getBatchManager().copy_command(user, executionhost, workdir + "/" + remote,
459                                                                "", "", local);
460         UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
461 #ifdef WIN32
462         copy_cmd = string("\"") + copy_cmd + string("\"");
463 #endif
464
465         if (system(copy_cmd.c_str()) ) {
466           // Echec de la copie
467           rc |= 1;
468         } else {
469           // On enregistre le fichier comme etant a detruire
470           files_to_delete.push_back(workdir + "/" + remote);
471         }
472
473       }
474     }
475
476     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
477     // ou si la creation du fils n'a pu avoir lieu
478     if ( (rc == 0) || (child < 0) ) {
479       std::vector<string>::const_iterator it;
480       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
481         string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
482         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
483 #ifdef WIN32
484         remove_cmd = string("\"") + remove_cmd + string("\"");
485 #endif
486         system(remove_cmd.c_str());
487       }
488     }
489
490
491
492     // On retire la fonction de nettoyage de la memoire
493     pthread_cleanup_pop(0);
494
495     // On retire la fonction de suppression du fils
496     pthread_cleanup_pop(0);
497
498
499
500     // On invoque la fonction de nettoyage de la memoire
501     delete_on_exit(arg);
502
503     UNDER_LOCK( cout << "Father is leaving" << endl );
504     pthread_exit(NULL);
505     return NULL;
506   }
507
508
509
510
511   void BatchManager_Local::ThreadAdapter::pere(Process child)
512   {
513     time_t child_starttime = time(NULL);
514
515     // On enregistre le fils dans la table des threads
516     pthread_t thread_id = pthread_self();
517     Id id = _bm.registerThread_id(thread_id);
518
519     Parametre param   = _job.getParametre();
520     Environnement env = _job.getEnvironnement();
521
522     ostringstream thread_id_sst;
523     thread_id_sst << id;
524     param[ID]         = thread_id_sst.str();
525     param[STATE]      = "Running";
526 #ifndef WIN32
527     param[PID]        = child;
528 #endif
529
530     // @@@ --------> SECTION CRITIQUE <-------- @@@
531     pthread_mutex_lock(&_bm._threads_mutex);
532     _bm._threads[id].thread_id = thread_id;
533 #ifndef WIN32
534     _bm._threads[id].pid       = child;
535 #endif
536     _bm._threads[id].status    = RUNNING;
537     _bm._threads[id].param     = param;
538     _bm._threads[id].env       = env;
539     _bm._threads[id].command_queue.push(NOP);
540     pthread_mutex_unlock(&_bm._threads_mutex);
541     // @@@ --------> SECTION CRITIQUE <-------- @@@
542
543
544
545
546     // on boucle en attendant que le fils ait termine
547     while (1) {
548 #ifdef WIN32
549       DWORD exitCode;
550       BOOL res = GetExitCodeProcess(child, &exitCode);
551       if (exitCode != STILL_ACTIVE) {
552         pthread_mutex_lock(&_bm._threads_mutex);
553         _bm._threads[id].status       = DONE;
554         _bm._threads[id].param[STATE] = "Done";
555         pthread_mutex_unlock(&_bm._threads_mutex);
556         // @@@ --------> SECTION CRITIQUE <-------- @@@
557         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
558         break;
559       }
560 #else
561       int child_rc = 0;
562       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
563       if (child_wait_rc > 0) {
564         if (WIFSTOPPED(child_rc)) {
565           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
566           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
567           // desactive car s'il est possible de detecter l'arret d'un process, il est
568           // plus difficile de detecter sa reprise.
569
570           // Le fils est simplement stoppe
571           // @@@ --------> SECTION CRITIQUE <-------- @@@
572           pthread_mutex_lock(&_bm._threads_mutex);
573           _bm._threads[id].status       = STOPPED;
574           _bm._threads[id].param[STATE] = "Stopped";
575           pthread_mutex_unlock(&_bm._threads_mutex);
576           // @@@ --------> SECTION CRITIQUE <-------- @@@
577           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
578
579         }
580         else {
581           // Le fils est termine, on sort de la boucle et du if englobant
582           // @@@ --------> SECTION CRITIQUE <-------- @@@
583           pthread_mutex_lock(&_bm._threads_mutex);
584           _bm._threads[id].status       = DONE;
585           _bm._threads[id].param[STATE] = "Done";
586           pthread_mutex_unlock(&_bm._threads_mutex);
587           // @@@ --------> SECTION CRITIQUE <-------- @@@
588           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
589           break;
590         }
591       }
592       else if (child_wait_rc == -1) {
593         // Le fils a disparu ...
594         // @@@ --------> SECTION CRITIQUE <-------- @@@
595         pthread_mutex_lock(&_bm._threads_mutex);
596         _bm._threads[id].status       = DEAD;
597         _bm._threads[id].param[STATE] = "Dead";
598         pthread_mutex_unlock(&_bm._threads_mutex);
599         // @@@ --------> SECTION CRITIQUE <-------- @@@
600         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
601         break;
602       }
603 #endif
604
605       // On teste si le thread doit etre detruit
606       pthread_testcancel();
607
608
609
610       // On regarde si le fils n'a pas depasse son temps (wallclock time)
611       time_t child_currenttime = time(NULL);
612       time_t child_elapsedtime = child_currenttime - child_starttime;
613       if (param.find(MAXWALLTIME) != param.end()) {
614         int maxwalltime = param[MAXWALLTIME];
615         //        cout << "child_starttime          = " << child_starttime        << endl
616         //             << "child_currenttime        = " << child_currenttime      << endl
617         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
618         //             << "maxwalltime              = " << maxwalltime            << endl
619         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
620         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
621           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
622           // On introduit une commande dans la queue du thread
623           // @@@ --------> SECTION CRITIQUE <-------- @@@
624           pthread_mutex_lock(&_bm._threads_mutex);
625           if (_bm._threads.find(id) != _bm._threads.end())
626             _bm._threads[id].command_queue.push(KILL);
627           pthread_mutex_unlock(&_bm._threads_mutex);
628           // @@@ --------> SECTION CRITIQUE <-------- @@@
629
630
631         } else if (child_elapsedtime > maxwalltime ) {
632           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
633           // On introduit une commande dans la queue du thread
634           // @@@ --------> SECTION CRITIQUE <-------- @@@
635           pthread_mutex_lock(&_bm._threads_mutex);
636           if (_bm._threads.find(id) != _bm._threads.end())
637             _bm._threads[id].command_queue.push(TERM);
638           pthread_mutex_unlock(&_bm._threads_mutex);
639           // @@@ --------> SECTION CRITIQUE <-------- @@@
640         }
641       }
642
643
644
645       // On regarde s'il y a quelque chose a faire dans la queue de commande
646       // @@@ --------> SECTION CRITIQUE <-------- @@@
647       pthread_mutex_lock(&_bm._threads_mutex);
648       if (_bm._threads.find(id) != _bm._threads.end()) {
649         while (_bm._threads[id].command_queue.size() > 0) {
650           Commande cmd = _bm._threads[id].command_queue.front();
651           _bm._threads[id].command_queue.pop();
652
653           switch (cmd) {
654     case NOP:
655       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
656       break;
657 #ifndef WIN32
658     case HOLD:
659       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
660       kill(child, SIGSTOP);
661       break;
662
663     case RELEASE:
664       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
665       kill(child, SIGCONT);
666       break;
667
668     case TERM:
669       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
670       kill(child, SIGTERM);
671       break;
672
673     case KILL:
674       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
675       kill(child, SIGKILL);
676       break;
677 #endif
678     case ALTER:
679       break;
680
681     default:
682       break;
683           }
684         }
685
686       }
687       pthread_mutex_unlock(&_bm._threads_mutex);
688       // @@@ --------> SECTION CRITIQUE <-------- @@@
689
690       // On fait une petite pause pour ne pas surcharger inutilement le processeur
691 #ifdef WIN32
692       Sleep(1000);
693 #else
694       sleep(1);
695 #endif
696
697     }
698
699   }
700
701
702
703 #ifndef WIN32
704
705   void BatchManager_Local::ThreadAdapter::fils()
706   {
707     Parametre param = _job.getParametre();
708     Parametre::iterator it;
709
710     try {
711
712       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
713       char * execpath = NULL;
714       if (param.find(EXECUTABLE) != param.end()) {
715         string executable = _bm.exec_command(param);
716         execpath          = new char [executable.size() + 1];
717         strncpy(execpath, executable.c_str(), executable.size() + 1);
718       } else exit(1);
719
720       string debug_command = execpath;
721
722       string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
723
724       char **  argv = NULL;
725       if (param.find(ARGUMENTS) != param.end()) {
726         Versatile V = param[ARGUMENTS];
727
728         argv = new char * [V.size() + 2]; // 1 pour name et 1 pour le NULL terminal
729
730         argv[0] = new char [name.size() + 1];
731         strncpy(argv[0], name.c_str(), name.size() + 1);
732
733         debug_command  += string(" # ") + argv[0];
734
735         int i = 1;
736         for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++, i++) {
737           StringType argt = * static_cast<StringType *>(*it);
738           string     arg  = argt;
739           argv[i]         = new char [arg.size() + 1];
740           strncpy(argv[i], arg.c_str(), arg.size() + 1);
741           debug_command  += string(" # ") + argv[i];
742         }
743
744         // assert (i == V.size() + 1)
745         argv[i] = NULL;
746       }
747
748
749       UNDER_LOCK( cout << "*** debug_command = " << debug_command << endl );
750
751
752
753       // Create the environment for the new process. Note (RB): Here we change the environment for
754       // the process launched in local. It would seem more logical to set the environment for the
755       // remote process.
756       Environnement env = _job.getEnvironnement();
757
758       char ** envp = NULL;
759       if(env.size() > 0) {
760         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
761         int i = 0;
762         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
763           const string  & key   = (*it).first;
764           const string  & value = (*it).second;
765           ostringstream oss;
766           oss << key << "=" << value;
767           envp[i]         = new char [oss.str().size() + 1];
768           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
769         }
770
771         // assert (i == env.size())
772         envp[i] = NULL;
773       }
774
775
776
777
778       // On positionne les limites systeme imposees au fils
779       if (param.find(MAXCPUTIME) != param.end()) {
780         int maxcputime = param[MAXCPUTIME];
781         struct rlimit limit;
782         limit.rlim_cur = maxcputime;
783         limit.rlim_max = int(maxcputime * 1.1);
784         setrlimit(RLIMIT_CPU, &limit);
785       }
786
787       if (param.find(MAXDISKSIZE) != param.end()) {
788         int maxdisksize = param[MAXDISKSIZE];
789         struct rlimit limit;
790         limit.rlim_cur = maxdisksize * 1024;
791         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
792         setrlimit(RLIMIT_FSIZE, &limit);
793       }
794
795       if (param.find(MAXRAMSIZE) != param.end()) {
796         int maxramsize = param[MAXRAMSIZE];
797         struct rlimit limit;
798         limit.rlim_cur = maxramsize * 1024;
799         limit.rlim_max = int(maxramsize * 1.1) * 1024;
800         setrlimit(RLIMIT_AS, &limit);
801       }
802
803
804
805       // On cree une session pour le fils de facon a ce qu'il ne soit pas
806       // detruit lorsque le shell se termine (le shell ouvre une session et
807       // tue tous les process appartenant a la session en quittant)
808       setsid();
809
810
811       // On ferme les descripteurs de fichiers standards
812       //close(STDIN_FILENO);
813       //close(STDOUT_FILENO);
814       //close(STDERR_FILENO);
815
816
817       // On execute la commande du fils
818       execve(execpath, argv, envp);
819
820       // No need to deallocate since nothing happens after a successful exec
821
822       // Normalement on ne devrait jamais arriver ici
823       ofstream file_err("error.log");
824       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
825
826     } catch (GenericException & e) {
827
828       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
829     }
830
831     exit(99);
832   }
833
834 #else
835
836   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
837   {
838     Parametre param = _job.getParametre();
839     Parametre::iterator it;
840     PROCESS_INFORMATION pi;
841
842     try {
843
844       // EXECUTABLE is MANDATORY, if missing, we throw an exception
845       string exec_command;
846       if (param.find(EXECUTABLE) != param.end()) {
847         exec_command = _bm.exec_command(param);
848       } else {
849         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
850       }
851
852       if (param.find(ARGUMENTS) != param.end()) {
853         Versatile V = param[ARGUMENTS];
854
855         for(Versatile::const_iterator it=V.begin() ; it!=V.end() ; it++) {
856           StringType argt = * static_cast<StringType *>(*it);
857           exec_command += string(" ") + string(argt);
858         }
859       }
860
861       UNDER_LOCK( cout << "*** exec_command = " << exec_command << endl );
862
863
864       // Create the environment for the new process. Note (RB): Here we change the environment for
865       // the process launched in local. It would seem more logical to set the environment for the
866       // remote process.
867       // Note that if no environment is specified, we reuse the current environment.
868       Environnement env = _job.getEnvironnement();
869       char * chNewEnv = NULL;
870
871       if(env.size() > 0) {
872         chNewEnv = new char[4096];
873         LPTSTR lpszCurrentVariable = chNewEnv;
874         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
875           const string  & key   = (*it).first;
876           const string  & value = (*it).second;
877           string envvar = key + "=" + value;
878           envvar.copy(lpszCurrentVariable, envvar.size());
879           lpszCurrentVariable[envvar.size()] = '\0';
880           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
881         }
882         // Terminate the block with a NULL byte.
883         *lpszCurrentVariable = '\0';
884       }
885
886
887       STARTUPINFO si;
888       ZeroMemory( &si, sizeof(si) );
889       si.cb = sizeof(si);
890       ZeroMemory( &pi, sizeof(pi) );
891
892       // Copy the command to a non-const buffer
893       char * buffer = strdup(exec_command.c_str());
894
895       // launch the new process
896       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
897                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
898
899       if (buffer) free(buffer);
900       if (!res) throw RunTimeException("Error while creating new process");
901
902       CloseHandle(pi.hThread);
903
904     } catch (GenericException & e) {
905
906       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
907     }
908
909     return pi.hProcess;
910   }
911
912 #endif
913
914
915   void BatchManager_Local::kill_child_on_exit(void * p_pid)
916   {
917 #ifndef WIN32
918     //TODO: porting of following functionality
919     pid_t child = * static_cast<pid_t *>(p_pid);
920
921     // On tue le fils
922     kill(child, SIGTERM);
923
924     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
925     // mais cette option n'est pas implementee pour le moment, car il est
926     // preferable de laisser le process fils se terminer normalement et seul.
927 #endif
928   }
929
930   void BatchManager_Local::delete_on_exit(void * arg)
931   {
932     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
933     delete p_ta;
934   }
935
936 }