Salome HOME
Refactored submission based on SH, RSH and SSH by grouping related commands in Commun...
[tools/libbatch.git] / src / Local / Batch_BatchManager_Local.cxx
1 //  Copyright (C) 2007-2008  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 //  Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 //  CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 //  This library is free software; you can redistribute it and/or
7 //  modify it under the terms of the GNU Lesser General Public
8 //  License as published by the Free Software Foundation; either
9 //  version 2.1 of the License.
10 //
11 //  This library is distributed in the hope that it will be useful,
12 //  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 //  Lesser General Public License for more details.
15 //
16 //  You should have received a copy of the GNU Lesser General Public
17 //  License along with this library; if not, write to the Free Software
18 //  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 //  See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22 /*
23 * BatchManager_Local.cxx :
24 *
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail   : mailto:ivan.dutka-malen@der.edf.fr
27 * Date   : Thu Nov  6 10:17:22 2003
28 * Projet : Salome 2
29 *
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
32 * managers.
33 */
34
35 #include <iostream>
36 #include <fstream>
37 #include <sstream>
38 #include <cstdlib>
39 #include <limits.h>
40
41 #include <sys/types.h>
42 #ifdef WIN32
43 #include <direct.h>
44 #include "Batch_RunTimeException.hxx"
45 #else
46 #include <sys/wait.h>
47 #include <unistd.h>
48 #endif
49 #include <ctime>
50 #include <pthread.h>
51 #include <signal.h>
52 #include <errno.h>
53 #include <string.h>
54 #include "Batch_IOMutex.hxx"
55 #include "Batch_BatchManager_Local.hxx"
56
57 using namespace std;
58
59 namespace Batch {
60
61
62   // Constructeur
63   BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64                                          CommunicationProtocolType protocolType)
65       throw(InvalidArgumentException,ConnexionFailureException)
66     : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(),
67       _protocol(CommunicationProtocol::getInstance(protocolType)),
68       _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
69   {
70     pthread_mutex_init(&_threads_mutex, NULL);
71     pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
72     pthread_cond_init(&_thread_id_id_association_cond, NULL);
73   }
74
75   // Destructeur
76   BatchManager_Local::~BatchManager_Local()
77   {
78     pthread_mutex_destroy(&_threads_mutex);
79     pthread_mutex_destroy(&_thread_id_id_association_mutex);
80     pthread_cond_destroy(&_thread_id_id_association_cond);
81   }
82
83   const CommunicationProtocol & BatchManager_Local::getProtocol() const
84   {
85     return _protocol;
86   }
87
88   // Methode pour le controle des jobs : soumet un job au gestionnaire
89   const JobId BatchManager_Local::submitJob(const Job & job)
90   {
91     Job_Local jobLocal = job;
92
93     pthread_t thread_id = submit(jobLocal);
94
95     ostringstream oss;
96     oss << getIdByThread_id(thread_id);
97
98     JobId id(this, oss.str());
99
100     return id;
101   }
102
103   // Methode pour le controle des jobs : retire un job du gestionnaire
104   void BatchManager_Local::deleteJob(const JobId & jobid)
105   {
106     Id id;
107
108     istringstream iss(jobid.getReference());
109     iss >> id;
110
111     // On retrouve le thread_id du thread
112     pthread_t thread_id;
113
114     // @@@ --------> SECTION CRITIQUE <-------- @@@
115     pthread_mutex_lock(&_threads_mutex);
116     if (_threads.find(id) != _threads.end())
117       thread_id = _threads[id].thread_id;
118     pthread_mutex_unlock(&_threads_mutex);
119     // @@@ --------> SECTION CRITIQUE <-------- @@@
120
121     cancel(thread_id);
122   }
123
124   // Methode pour le controle des jobs : suspend un job en file d'attente
125   void BatchManager_Local::holdJob(const JobId & jobid)
126   {
127     Id id;
128     istringstream iss(jobid.getReference());
129     iss >> id;
130
131     UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
132
133     // On introduit une commande dans la queue du thread
134     // @@@ --------> SECTION CRITIQUE <-------- @@@
135     pthread_mutex_lock(&_threads_mutex);
136     if (_threads.find(id) != _threads.end())
137       _threads[id].command_queue.push(HOLD);
138     pthread_mutex_unlock(&_threads_mutex);
139     // @@@ --------> SECTION CRITIQUE <-------- @@@
140   }
141
142   // Methode pour le controle des jobs : relache un job suspendu
143   void BatchManager_Local::releaseJob(const JobId & jobid)
144   {
145     Id id;
146     istringstream iss(jobid.getReference());
147     iss >> id;
148
149     UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
150
151     // On introduit une commande dans la queue du thread
152     // @@@ --------> SECTION CRITIQUE <-------- @@@
153     pthread_mutex_lock(&_threads_mutex);
154     if (_threads.find(id) != _threads.end())
155       _threads[id].command_queue.push(RELEASE);
156     pthread_mutex_unlock(&_threads_mutex);
157     // @@@ --------> SECTION CRITIQUE <-------- @@@
158   }
159
160
161   // Methode pour le controle des jobs : modifie un job en file d'attente
162   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
163   {
164   }
165
166   // Methode pour le controle des jobs : modifie un job en file d'attente
167   void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
168   {
169     alterJob(jobid, param, Environnement());
170   }
171
172   // Methode pour le controle des jobs : modifie un job en file d'attente
173   void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
174   {
175     alterJob(jobid, Parametre(), env);
176   }
177
178
179
180   // Methode pour le controle des jobs : renvoie l'etat du job
181   JobInfo BatchManager_Local::queryJob(const JobId & jobid)
182   {
183     Id id;
184     istringstream iss(jobid.getReference());
185     iss >> id;
186
187     Parametre param;
188     Environnement env;
189
190     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
191     // @@@ --------> SECTION CRITIQUE <-------- @@@
192     pthread_mutex_lock(&_threads_mutex);
193     param = _threads[id].param;
194     env   = _threads[id].env;
195     pthread_mutex_unlock(&_threads_mutex);
196     // @@@ --------> SECTION CRITIQUE <-------- @@@
197     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
198
199     JobInfo_Local ji(param, env);
200     return ji;
201   }
202
203
204
205   // Methode pour le controle des jobs : teste si un job est present en machine
206   bool BatchManager_Local::isRunning(const JobId & jobid)
207   {
208     Id id;
209     istringstream iss(jobid.getReference());
210     iss >> id;
211
212     Status status;
213
214     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
215     // @@@ --------> SECTION CRITIQUE <-------- @@@
216     pthread_mutex_lock(&_threads_mutex);
217     status = _threads[id].status;
218     pthread_mutex_unlock(&_threads_mutex);
219     // @@@ --------> SECTION CRITIQUE <-------- @@@
220     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
221
222     return (status == RUNNING);
223   }
224
225
226   // Methode d'execution d'un job
227   pthread_t BatchManager_Local::submit(const Job_Local & job)
228   {
229     // L'id du thread a creer
230     pthread_t thread_id =
231 #ifdef WIN32
232     {0,0};
233 #else
234       0;
235 #endif
236
237     // Les attributs du thread a sa creation
238     pthread_attr_t thread_attr;
239     pthread_attr_init(&thread_attr);
240     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
241
242     ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
243
244     // Creation du thread qui va executer la commande systeme qu'on lui passe
245     int rc = pthread_create(&thread_id,
246       &thread_attr,
247       &ThreadAdapter::run,
248       static_cast<void *>(p_ta));
249     if (rc) {
250     }
251
252     // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
253     pthread_attr_destroy(&thread_attr);
254
255     return thread_id;
256   }
257
258
259   // Methode de destruction d'un job
260   void BatchManager_Local::cancel(pthread_t thread_id)
261   {
262     pthread_cancel(thread_id);
263   }
264
265
266   vector<string> BatchManager_Local::exec_command(const Parametre & param) const
267   {
268     ostringstream exec_sub_cmd;
269
270 #ifdef WIN32
271     char drive[_MAX_DRIVE];
272     _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
273     if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
274 #endif
275
276     exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
277
278     if (param.find(ARGUMENTS) != param.end()) {
279       Versatile V = param[ARGUMENTS];
280       for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
281         StringType argt = * static_cast<StringType *>(*it);
282         string     arg  = argt;
283         exec_sub_cmd << " " << arg;
284       }
285     }
286
287     string user;
288     Parametre::const_iterator it = param.find(USER);
289     if (it != param.end()) {
290       user = string(it->second);
291     }
292
293     return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
294   }
295
296
297   // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
298   // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
299   // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
300   // Thread_id / Id (_thread_id_id_association_mutex)
301   BatchManager_Local::Id BatchManager_Local::nextId()
302   {
303     static Id id = 0;
304     Id nextId = id++;
305     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
306     return nextId;
307   }
308
309
310   // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
311   BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
312   {
313     // @@@ --------> SECTION CRITIQUE <-------- @@@
314     pthread_mutex_lock(&_thread_id_id_association_mutex);
315     bool threadIdFound = false;
316     std::list<struct ThreadIdIdAssociation>::iterator it;
317     while (!threadIdFound) {
318       for (it = _thread_id_id_association.begin() ;
319            it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
320            it++);
321       if (it == _thread_id_id_association.end())
322         pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
323       else
324         threadIdFound = true;
325     }
326
327     Id id = it->id;
328     _thread_id_id_association.erase(it);
329
330     pthread_mutex_unlock(&_thread_id_id_association_mutex);
331     // @@@ --------> SECTION CRITIQUE <-------- @@@
332
333     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
334     return id;
335   }
336
337
338   // Associe un Thread_id a un Id nouvellement cree
339   BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
340   {
341     Id id = -1;
342
343     // @@@ --------> SECTION CRITIQUE <-------- @@@
344     pthread_mutex_lock(&_thread_id_id_association_mutex);
345     std::list<struct ThreadIdIdAssociation>::iterator it;
346     for (it = _thread_id_id_association.begin() ;
347          it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
348          it++);
349     if (it == _thread_id_id_association.end()) {
350       struct ThreadIdIdAssociation newAssociation;
351       id = newAssociation.id = nextId();
352       newAssociation.threadId = thread_id;
353       _thread_id_id_association.push_back(newAssociation);
354       pthread_cond_signal(&_thread_id_id_association_cond);
355
356     } else {
357       UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
358     }
359     pthread_mutex_unlock(&_thread_id_id_association_mutex);
360     // @@@ --------> SECTION CRITIQUE <-------- @@@
361
362     //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
363     return id;
364   }
365
366
367   // Constructeur de la classe ThreadAdapter
368   BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
369   _bm(bm), _job(job)
370   {
371     // Nothing to do
372   }
373
374
375
376   // Methode d'execution du thread
377   void * BatchManager_Local::ThreadAdapter::run(void * arg)
378   {
379 #ifndef WIN32
380     // On bloque tous les signaux pour ce thread
381     sigset_t setmask;
382     sigfillset(&setmask);
383     pthread_sigmask(SIG_BLOCK, &setmask, NULL);
384 #endif
385
386     // On autorise la terminaison differee du thread
387     // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
388     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,  NULL);
389     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
390
391     // On enregistre la fonction de suppression du fils en cas d'arret du thread
392     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
393     // sera prise en compte par pthread_testcancel()
394     Process child;
395     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
396     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
397
398     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
399
400
401
402
403     // Le code retour cumule (ORed) de tous les appels
404     // Nul en cas de reussite de l'ensemble des operations
405     int rc = 0;
406
407     // Cette table contient la liste des fichiers a detruire a la fin du processus
408     std::vector<string> files_to_delete;
409
410
411
412     // On copie les fichiers d'entree pour le fils
413     const Parametre param   = p_ta->_job.getParametre();
414     Parametre::const_iterator it;
415
416     // On initialise la variable workdir a la valeur du Current Working Directory
417     char * cwd =
418 #ifdef WIN32
419       _getcwd(NULL, 0);
420 #else
421       new char [PATH_MAX];
422     getcwd(cwd, PATH_MAX);
423 #endif
424     string workdir = cwd;
425     delete [] cwd;
426
427     if ( (it = param.find(WORKDIR)) != param.end() ) {
428       workdir = static_cast<string>( (*it).second );
429     }
430
431     string executionhost = string(param[EXECUTIONHOST]);
432     string user;
433     if ( (it = param.find(USER)) != param.end() ) {
434       user = string(it->second);
435     }
436
437     if ( (it = param.find(INFILE)) != param.end() ) {
438       Versatile V = (*it).second;
439       Versatile::iterator Vit;
440
441       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
442         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
443         Couple cp       = cpt;
444         string local    = cp.getLocal();
445         string remote   = cp.getRemote();
446
447         int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
448                                                                     workdir + "/" + remote,
449                                                                     executionhost, user);
450         if (status) {
451           // Echec de la copie
452           rc |= 1;
453         } else {
454           // On enregistre le fichier comme etant a detruire
455           files_to_delete.push_back(workdir + "/" + remote);
456         }
457
458       }
459     }
460
461
462
463
464     // On forke/exec un nouveau process pour pouvoir controler le fils
465     // (plus finement qu'avec un appel system)
466     // int rc = system(commande.c_str());
467 #ifdef WIN32
468     child = p_ta->launchWin32ChildProcess();
469     p_ta->pere(child);
470 #else
471     child = fork();
472     if (child < 0) { // erreur
473       UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
474
475     } else if (child > 0) { // pere
476       p_ta->pere(child);
477
478     } else { // fils
479       p_ta->fils();
480     }
481 #endif
482
483
484     // On copie les fichiers de sortie du fils
485     if ( (it = param.find(OUTFILE)) != param.end() ) {
486       Versatile V = (*it).second;
487       Versatile::iterator Vit;
488
489       for(Vit=V.begin(); Vit!=V.end(); Vit++) {
490         CoupleType cpt  = *static_cast< CoupleType * >(*Vit);
491         Couple cp       = cpt;
492         string local    = cp.getLocal();
493         string remote   = cp.getRemote();
494
495         int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
496                                                                     executionhost, user,
497                                                                     local, "", "");
498         if (status) {
499           // Echec de la copie
500           rc |= 1;
501         } else {
502           // On enregistre le fichier comme etant a detruire
503           files_to_delete.push_back(workdir + "/" + remote);
504         }
505
506       }
507     }
508
509     // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
510     // ou si la creation du fils n'a pu avoir lieu
511     if ( (rc == 0) || (child < 0) ) {
512       std::vector<string>::const_iterator it;
513       for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
514         p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
515 /*        string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
516         UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
517 #ifdef WIN32
518         remove_cmd = string("\"") + remove_cmd + string("\"");
519 #endif
520         system(remove_cmd.c_str());*/
521       }
522     }
523
524
525
526     // On retire la fonction de nettoyage de la memoire
527     pthread_cleanup_pop(0);
528
529     // On retire la fonction de suppression du fils
530     pthread_cleanup_pop(0);
531
532
533
534     // On invoque la fonction de nettoyage de la memoire
535     delete_on_exit(arg);
536
537     UNDER_LOCK( cout << "Father is leaving" << endl );
538     pthread_exit(NULL);
539     return NULL;
540   }
541
542
543
544
545   void BatchManager_Local::ThreadAdapter::pere(Process child)
546   {
547     time_t child_starttime = time(NULL);
548
549     // On enregistre le fils dans la table des threads
550     pthread_t thread_id = pthread_self();
551     Id id = _bm.registerThread_id(thread_id);
552
553     Parametre param   = _job.getParametre();
554     Environnement env = _job.getEnvironnement();
555
556     ostringstream thread_id_sst;
557     thread_id_sst << id;
558     param[ID]         = thread_id_sst.str();
559     param[STATE]      = "Running";
560 #ifndef WIN32
561     param[PID]        = child;
562 #endif
563
564     // @@@ --------> SECTION CRITIQUE <-------- @@@
565     pthread_mutex_lock(&_bm._threads_mutex);
566     _bm._threads[id].thread_id = thread_id;
567 #ifndef WIN32
568     _bm._threads[id].pid       = child;
569 #endif
570     _bm._threads[id].status    = RUNNING;
571     _bm._threads[id].param     = param;
572     _bm._threads[id].env       = env;
573     _bm._threads[id].command_queue.push(NOP);
574     pthread_mutex_unlock(&_bm._threads_mutex);
575     // @@@ --------> SECTION CRITIQUE <-------- @@@
576
577
578
579
580     // on boucle en attendant que le fils ait termine
581     while (1) {
582 #ifdef WIN32
583       DWORD exitCode;
584       BOOL res = GetExitCodeProcess(child, &exitCode);
585       if (exitCode != STILL_ACTIVE) {
586         pthread_mutex_lock(&_bm._threads_mutex);
587         _bm._threads[id].status       = DONE;
588         _bm._threads[id].param[STATE] = "Done";
589         pthread_mutex_unlock(&_bm._threads_mutex);
590         // @@@ --------> SECTION CRITIQUE <-------- @@@
591         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
592         break;
593       }
594 #else
595       int child_rc = 0;
596       pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
597       if (child_wait_rc > 0) {
598         if (WIFSTOPPED(child_rc)) {
599           // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
600           // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
601           // desactive car s'il est possible de detecter l'arret d'un process, il est
602           // plus difficile de detecter sa reprise.
603
604           // Le fils est simplement stoppe
605           // @@@ --------> SECTION CRITIQUE <-------- @@@
606           pthread_mutex_lock(&_bm._threads_mutex);
607           _bm._threads[id].status       = STOPPED;
608           _bm._threads[id].param[STATE] = "Stopped";
609           pthread_mutex_unlock(&_bm._threads_mutex);
610           // @@@ --------> SECTION CRITIQUE <-------- @@@
611           UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
612
613         }
614         else {
615           // Le fils est termine, on sort de la boucle et du if englobant
616           // @@@ --------> SECTION CRITIQUE <-------- @@@
617           pthread_mutex_lock(&_bm._threads_mutex);
618           _bm._threads[id].status       = DONE;
619           _bm._threads[id].param[STATE] = "Done";
620           pthread_mutex_unlock(&_bm._threads_mutex);
621           // @@@ --------> SECTION CRITIQUE <-------- @@@
622           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
623           break;
624         }
625       }
626       else if (child_wait_rc == -1) {
627         // Le fils a disparu ...
628         // @@@ --------> SECTION CRITIQUE <-------- @@@
629         pthread_mutex_lock(&_bm._threads_mutex);
630         _bm._threads[id].status       = DEAD;
631         _bm._threads[id].param[STATE] = "Dead";
632         pthread_mutex_unlock(&_bm._threads_mutex);
633         // @@@ --------> SECTION CRITIQUE <-------- @@@
634         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
635         break;
636       }
637 #endif
638
639       // On teste si le thread doit etre detruit
640       pthread_testcancel();
641
642
643
644       // On regarde si le fils n'a pas depasse son temps (wallclock time)
645       time_t child_currenttime = time(NULL);
646       time_t child_elapsedtime = child_currenttime - child_starttime;
647       if (param.find(MAXWALLTIME) != param.end()) {
648         int maxwalltime = param[MAXWALLTIME];
649         //        cout << "child_starttime          = " << child_starttime        << endl
650         //             << "child_currenttime        = " << child_currenttime      << endl
651         //             << "child_elapsedtime        = " << child_elapsedtime      << endl
652         //             << "maxwalltime              = " << maxwalltime            << endl
653         //             << "int(maxwalltime * 1.1)   = " << int(maxwalltime * 1.1) << endl;
654         if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
655           UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
656           // On introduit une commande dans la queue du thread
657           // @@@ --------> SECTION CRITIQUE <-------- @@@
658           pthread_mutex_lock(&_bm._threads_mutex);
659           if (_bm._threads.find(id) != _bm._threads.end())
660             _bm._threads[id].command_queue.push(KILL);
661           pthread_mutex_unlock(&_bm._threads_mutex);
662           // @@@ --------> SECTION CRITIQUE <-------- @@@
663
664
665         } else if (child_elapsedtime > maxwalltime ) {
666           UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
667           // On introduit une commande dans la queue du thread
668           // @@@ --------> SECTION CRITIQUE <-------- @@@
669           pthread_mutex_lock(&_bm._threads_mutex);
670           if (_bm._threads.find(id) != _bm._threads.end())
671             _bm._threads[id].command_queue.push(TERM);
672           pthread_mutex_unlock(&_bm._threads_mutex);
673           // @@@ --------> SECTION CRITIQUE <-------- @@@
674         }
675       }
676
677
678
679       // On regarde s'il y a quelque chose a faire dans la queue de commande
680       // @@@ --------> SECTION CRITIQUE <-------- @@@
681       pthread_mutex_lock(&_bm._threads_mutex);
682       if (_bm._threads.find(id) != _bm._threads.end()) {
683         while (_bm._threads[id].command_queue.size() > 0) {
684           Commande cmd = _bm._threads[id].command_queue.front();
685           _bm._threads[id].command_queue.pop();
686
687           switch (cmd) {
688     case NOP:
689       UNDER_LOCK( cout << "Father does nothing to his child" << endl );
690       break;
691 #ifndef WIN32
692     case HOLD:
693       UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
694       kill(child, SIGSTOP);
695       break;
696
697     case RELEASE:
698       UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
699       kill(child, SIGCONT);
700       break;
701
702     case TERM:
703       UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
704       kill(child, SIGTERM);
705       break;
706
707     case KILL:
708       UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
709       kill(child, SIGKILL);
710       break;
711 #endif
712     case ALTER:
713       break;
714
715     default:
716       break;
717           }
718         }
719
720       }
721       pthread_mutex_unlock(&_bm._threads_mutex);
722       // @@@ --------> SECTION CRITIQUE <-------- @@@
723
724       // On fait une petite pause pour ne pas surcharger inutilement le processeur
725 #ifdef WIN32
726       Sleep(1000);
727 #else
728       sleep(1);
729 #endif
730
731     }
732
733   }
734
735
736
737 #ifndef WIN32
738
739   void BatchManager_Local::ThreadAdapter::fils()
740   {
741     Parametre param = _job.getParametre();
742     Parametre::iterator it;
743
744     try {
745
746       // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
747       vector<string> command;
748       if (param.find(EXECUTABLE) != param.end()) {
749         command = _bm.exec_command(param);
750       } else exit(1);
751
752       // Build the argument array argv from the command
753       char ** argv = new char * [command.size() + 1];
754       string comstr;
755       for (string::size_type i=0 ; i<command.size() ; i++) {
756         argv[i] = new char[command[i].size() + 1];
757         strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
758         if (i>0) comstr += " # ";
759         comstr += command[i];
760       }
761
762       argv[command.size()] = NULL;
763
764       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
765
766       // Create the environment for the new process. Note (RB): Here we change the environment for
767       // the process launched in local. It would seem more logical to set the environment for the
768       // remote process.
769       Environnement env = _job.getEnvironnement();
770
771       char ** envp = NULL;
772       if(env.size() > 0) {
773         envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
774         int i = 0;
775         for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
776           const string  & key   = (*it).first;
777           const string  & value = (*it).second;
778           ostringstream oss;
779           oss << key << "=" << value;
780           envp[i]         = new char [oss.str().size() + 1];
781           strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
782         }
783
784         // assert (i == env.size())
785         envp[i] = NULL;
786       }
787
788
789
790
791       // On positionne les limites systeme imposees au fils
792       if (param.find(MAXCPUTIME) != param.end()) {
793         int maxcputime = param[MAXCPUTIME];
794         struct rlimit limit;
795         limit.rlim_cur = maxcputime;
796         limit.rlim_max = int(maxcputime * 1.1);
797         setrlimit(RLIMIT_CPU, &limit);
798       }
799
800       if (param.find(MAXDISKSIZE) != param.end()) {
801         int maxdisksize = param[MAXDISKSIZE];
802         struct rlimit limit;
803         limit.rlim_cur = maxdisksize * 1024;
804         limit.rlim_max = int(maxdisksize * 1.1) * 1024;
805         setrlimit(RLIMIT_FSIZE, &limit);
806       }
807
808       if (param.find(MAXRAMSIZE) != param.end()) {
809         int maxramsize = param[MAXRAMSIZE];
810         struct rlimit limit;
811         limit.rlim_cur = maxramsize * 1024;
812         limit.rlim_max = int(maxramsize * 1.1) * 1024;
813         setrlimit(RLIMIT_AS, &limit);
814       }
815
816
817
818       // On cree une session pour le fils de facon a ce qu'il ne soit pas
819       // detruit lorsque le shell se termine (le shell ouvre une session et
820       // tue tous les process appartenant a la session en quittant)
821       setsid();
822
823
824       // On ferme les descripteurs de fichiers standards
825       //close(STDIN_FILENO);
826       //close(STDOUT_FILENO);
827       //close(STDERR_FILENO);
828
829
830       // On execute la commande du fils
831       execve(argv[0], argv, envp);
832
833       // No need to deallocate since nothing happens after a successful exec
834
835       // Normalement on ne devrait jamais arriver ici
836       ofstream file_err("error.log");
837       UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
838
839     } catch (GenericException & e) {
840
841       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
842     }
843
844     exit(99);
845   }
846
847 #else
848
849   BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
850   {
851     Parametre param = _job.getParametre();
852     Parametre::iterator it;
853     PROCESS_INFORMATION pi;
854
855     try {
856
857       // EXECUTABLE is MANDATORY, if missing, we throw an exception
858       vector<string> exec_command;
859       if (param.find(EXECUTABLE) != param.end()) {
860         exec_command = _bm.exec_command(param);
861       } else {
862         throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
863       }
864
865       // Build the command string from the command argument vector
866       string comstr;
867       for (unsigned int i=0 ; i<exec_command.size() ; i++) {
868         if (i>0) comstr += " ";
869         comstr += exec_command[i];
870       }
871
872       UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
873
874       // Create the environment for the new process. Note (RB): Here we change the environment for
875       // the process launched in local. It would seem more logical to set the environment for the
876       // remote process.
877       // Note that if no environment is specified, we reuse the current environment.
878       Environnement env = _job.getEnvironnement();
879       char * chNewEnv = NULL;
880
881       if(env.size() > 0) {
882         chNewEnv = new char[4096];
883         LPTSTR lpszCurrentVariable = chNewEnv;
884         for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
885           const string  & key   = (*it).first;
886           const string  & value = (*it).second;
887           string envvar = key + "=" + value;
888           envvar.copy(lpszCurrentVariable, envvar.size());
889           lpszCurrentVariable[envvar.size()] = '\0';
890           lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
891         }
892         // Terminate the block with a NULL byte.
893         *lpszCurrentVariable = '\0';
894       }
895
896
897       STARTUPINFO si;
898       ZeroMemory( &si, sizeof(si) );
899       si.cb = sizeof(si);
900       ZeroMemory( &pi, sizeof(pi) );
901
902       // Copy the command to a non-const buffer
903       char * buffer = strdup(comstr.c_str());
904
905       // launch the new process
906       BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
907                                CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
908
909       if (buffer) free(buffer);
910       if (!res) throw RunTimeException("Error while creating new process");
911
912       CloseHandle(pi.hThread);
913
914     } catch (GenericException & e) {
915
916       std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
917     }
918
919     return pi.hProcess;
920   }
921
922 #endif
923
924
925   void BatchManager_Local::kill_child_on_exit(void * p_pid)
926   {
927 #ifndef WIN32
928     //TODO: porting of following functionality
929     pid_t child = * static_cast<pid_t *>(p_pid);
930
931     // On tue le fils
932     kill(child, SIGTERM);
933
934     // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
935     // mais cette option n'est pas implementee pour le moment, car il est
936     // preferable de laisser le process fils se terminer normalement et seul.
937 #endif
938   }
939
940   void BatchManager_Local::delete_on_exit(void * arg)
941   {
942     ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
943     delete p_ta;
944   }
945
946 }