2 * BatchManager_Local.cxx :
4 * Auteur : Ivan DUTKA-MALEN - EDF R&D
5 * Mail : mailto:ivan.dutka-malen@der.edf.fr
6 * Date : Thu Nov 6 10:17:22 2003
15 #include <sys/types.h>
23 #include "Batch_IOMutex.hxx"
24 #include "Batch_BatchManager_Local.hxx"
30 BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host) throw(InvalidArgumentException,ConnexionFailureException) : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
32 pthread_mutex_init(&_threads_mutex, NULL);
33 pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
34 pthread_cond_init(&_thread_id_id_association_cond, NULL);
38 BatchManager_Local::~BatchManager_Local()
40 pthread_mutex_destroy(&_threads_mutex);
41 pthread_mutex_destroy(&_thread_id_id_association_mutex);
42 pthread_cond_destroy(&_thread_id_id_association_cond);
45 // Methode pour le controle des jobs : soumet un job au gestionnaire
46 const JobId BatchManager_Local::submitJob(const Job & job)
48 Job_Local jobLocal = job;
50 pthread_t thread_id = submit(jobLocal);
53 oss << getIdByThread_id(thread_id);
55 JobId id(this, oss.str());
60 // Methode pour le controle des jobs : retire un job du gestionnaire
61 void BatchManager_Local::deleteJob(const JobId & jobid)
65 istringstream iss(jobid.getReference());
68 // On retrouve le thread_id du thread
71 // @@@ --------> SECTION CRITIQUE <-------- @@@
72 pthread_mutex_lock(&_threads_mutex);
73 if (_threads.find(id) != _threads.end())
74 thread_id = _threads[id].thread_id;
75 pthread_mutex_unlock(&_threads_mutex);
76 // @@@ --------> SECTION CRITIQUE <-------- @@@
81 // Methode pour le controle des jobs : suspend un job en file d'attente
82 void BatchManager_Local::holdJob(const JobId & jobid)
85 istringstream iss(jobid.getReference());
88 UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
90 // On introduit une commande dans la queue du thread
91 // @@@ --------> SECTION CRITIQUE <-------- @@@
92 pthread_mutex_lock(&_threads_mutex);
93 if (_threads.find(id) != _threads.end())
94 _threads[id].command_queue.push(HOLD);
95 pthread_mutex_unlock(&_threads_mutex);
96 // @@@ --------> SECTION CRITIQUE <-------- @@@
99 // Methode pour le controle des jobs : relache un job suspendu
100 void BatchManager_Local::releaseJob(const JobId & jobid)
103 istringstream iss(jobid.getReference());
106 UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
108 // On introduit une commande dans la queue du thread
109 // @@@ --------> SECTION CRITIQUE <-------- @@@
110 pthread_mutex_lock(&_threads_mutex);
111 if (_threads.find(id) != _threads.end())
112 _threads[id].command_queue.push(RELEASE);
113 pthread_mutex_unlock(&_threads_mutex);
114 // @@@ --------> SECTION CRITIQUE <-------- @@@
118 // Methode pour le controle des jobs : modifie un job en file d'attente
119 void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
123 // Methode pour le controle des jobs : modifie un job en file d'attente
124 void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
126 alterJob(jobid, param, Environnement());
129 // Methode pour le controle des jobs : modifie un job en file d'attente
130 void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
132 alterJob(jobid, Parametre(), env);
137 // Methode pour le controle des jobs : renvoie l'etat du job
138 JobInfo BatchManager_Local::queryJob(const JobId & jobid)
141 istringstream iss(jobid.getReference());
147 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
148 // @@@ --------> SECTION CRITIQUE <-------- @@@
149 pthread_mutex_lock(&_threads_mutex);
150 param = _threads[id].param;
151 env = _threads[id].env;
152 pthread_mutex_unlock(&_threads_mutex);
153 // @@@ --------> SECTION CRITIQUE <-------- @@@
154 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
156 JobInfo_Local ji(param, env);
162 // Methode pour le controle des jobs : teste si un job est present en machine
163 bool BatchManager_Local::isRunning(const JobId & jobid)
166 istringstream iss(jobid.getReference());
171 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
172 // @@@ --------> SECTION CRITIQUE <-------- @@@
173 pthread_mutex_lock(&_threads_mutex);
174 status = _threads[id].status;
175 pthread_mutex_unlock(&_threads_mutex);
176 // @@@ --------> SECTION CRITIQUE <-------- @@@
177 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
179 return (status == RUNNING);
183 // Methode d'execution d'un job
184 pthread_t BatchManager_Local::submit(const Job_Local & job)
186 // L'id du thread a creer
187 pthread_t thread_id = 0;
189 // Les attributs du thread a sa creation
190 pthread_attr_t thread_attr;
191 pthread_attr_init(&thread_attr);
192 pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
194 ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
196 // Creation du thread qui va executer la commande systeme qu'on lui passe
197 int rc = pthread_create(&thread_id,
200 static_cast<void *>(p_ta));
204 // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
205 pthread_attr_destroy(&thread_attr);
211 // Methode de destruction d'un job
212 void BatchManager_Local::cancel(pthread_t thread_id)
214 pthread_cancel(thread_id);
218 // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
219 // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
220 // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
221 // Thread_id / Id (_thread_id_id_association_mutex)
222 BatchManager_Local::Id BatchManager_Local::nextId()
226 //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
231 // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
232 BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
236 // @@@ --------> SECTION CRITIQUE <-------- @@@
237 pthread_mutex_lock(&_thread_id_id_association_mutex);
238 while (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end())
239 pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
241 id = _thread_id_id_association[thread_id];
242 _thread_id_id_association.erase(thread_id);
244 pthread_mutex_unlock(&_thread_id_id_association_mutex);
245 // @@@ --------> SECTION CRITIQUE <-------- @@@
247 //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
252 // Associe un Thread_id a un Id nouvellement cree
253 BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
257 // @@@ --------> SECTION CRITIQUE <-------- @@@
258 pthread_mutex_lock(&_thread_id_id_association_mutex);
259 if (_thread_id_id_association.find(thread_id) == _thread_id_id_association.end()) {
260 id = _thread_id_id_association[thread_id] = nextId();
261 pthread_cond_signal(&_thread_id_id_association_cond);
264 UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
266 pthread_mutex_unlock(&_thread_id_id_association_mutex);
267 // @@@ --------> SECTION CRITIQUE <-------- @@@
269 //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
274 // Constructeur de la classe ThreadAdapter
275 BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
283 // Methode d'execution du thread
284 void * BatchManager_Local::ThreadAdapter::run(void * arg)
286 // On bloque tous les signaux pour ce thread
288 sigfillset(&setmask);
289 pthread_sigmask(SIG_BLOCK, &setmask, NULL);
292 // On autorise la terminaison differee du thread
293 // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
294 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
295 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
297 // On enregistre la fonction de suppression du fils en cas d'arret du thread
298 // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
299 // sera prise en compte par pthread_testcancel()
301 pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
302 pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
304 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
309 // Le code retour cumule (ORed) de tous les appels
310 // Nul en cas de reussite de l'ensemble des operations
313 // Cette table contient la liste des fichiers a detruire a la fin du processus
314 std::vector<string> files_to_delete;
318 // On copie les fichiers d'entree pour le fils
319 const Parametre param = p_ta->_job.getParametre();
320 Parametre::const_iterator it;
322 // On initialise la variable workdir a la valeur du Current Working Directory
323 char * cwd = new char [PATH_MAX];
324 getcwd(cwd, PATH_MAX);
325 string workdir = cwd;
328 if ( (it = param.find(WORKDIR)) != param.end() ) {
329 workdir = static_cast<string>( (*it).second );
332 string executionhost = string(param[EXECUTIONHOST]);
334 if ( (it = param.find(INFILE)) != param.end() ) {
335 Versatile V = (*it).second;
336 Versatile::iterator Vit;
338 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
339 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
341 string local = cp.getLocal();
342 string remote = cp.getRemote();
344 string copy_cmd = p_ta->getBatchManager().copy_command("", local, executionhost, workdir + "/" + remote);
345 UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
347 if (system(copy_cmd.c_str()) ) {
351 // On enregistre le fichier comme etant a detruire
352 files_to_delete.push_back(workdir + "/" + remote);
362 // On forke/exec un nouveau process pour pouvoir controler le fils
363 // (plus finement qu'avec un appel system)
364 // int rc = system(commande.c_str());
366 if (child < 0) { // erreur
367 UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
369 } else if (child > 0) { // pere
379 // On copie les fichiers de sortie du fils
380 if ( (it = param.find(OUTFILE)) != param.end() ) {
381 Versatile V = (*it).second;
382 Versatile::iterator Vit;
384 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
385 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
387 string local = cp.getLocal();
388 string remote = cp.getRemote();
390 string copy_cmd = p_ta->getBatchManager().copy_command(executionhost, workdir + "/" + remote, "", local);
391 UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
393 if (system(copy_cmd.c_str()) ) {
397 // On enregistre le fichier comme etant a detruire
398 files_to_delete.push_back(workdir + "/" + remote);
407 // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
408 // ou si la creation du fils n'a pu avoir lieu
409 if ( (rc == 0) || (child < 0) ) {
410 std::vector<string>::const_iterator it;
411 for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
412 string remove_cmd = p_ta->getBatchManager().remove_command(executionhost, *it);
413 UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
414 system(remove_cmd.c_str());
420 // On retire la fonction de nettoyage de la memoire
421 pthread_cleanup_pop(0);
423 // On retire la fonction de suppression du fils
424 pthread_cleanup_pop(0);
428 // On invoque la fonction de nettoyage de la memoire
431 UNDER_LOCK( cout << "Father is leaving" << endl );
440 void BatchManager_Local::ThreadAdapter::pere(pid_t child)
442 time_t child_starttime = time(NULL);
444 // On enregistre le fils dans la table des threads
445 pthread_t thread_id = pthread_self();
446 Id id = _bm.registerThread_id(thread_id);
448 Parametre param = _job.getParametre();
449 Environnement env = _job.getEnvironnement();
451 ostringstream thread_id_sst;
453 param[ID] = thread_id_sst.str();
454 param[STATE] = "Running";
457 // @@@ --------> SECTION CRITIQUE <-------- @@@
458 pthread_mutex_lock(&_bm._threads_mutex);
459 _bm._threads[id].thread_id = thread_id;
460 _bm._threads[id].pid = child;
461 _bm._threads[id].status = RUNNING;
462 _bm._threads[id].param = param;
463 _bm._threads[id].env = env;
464 _bm._threads[id].command_queue.push(NOP);
465 pthread_mutex_unlock(&_bm._threads_mutex);
466 // @@@ --------> SECTION CRITIQUE <-------- @@@
472 // on boucle en attendant que le fils ait termine
475 pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
476 if (child_wait_rc > 0) {
477 if (WIFSTOPPED(child_rc)) {
478 // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
479 // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
480 // desactive car s'il est possible de detecter l'arret d'un process, il est
481 // plus difficile de detecter sa reprise.
483 // Le fils est simplement stoppe
484 // @@@ --------> SECTION CRITIQUE <-------- @@@
485 pthread_mutex_lock(&_bm._threads_mutex);
486 _bm._threads[id].status = STOPPED;
487 _bm._threads[id].param[STATE] = "Stopped";
488 pthread_mutex_unlock(&_bm._threads_mutex);
489 // @@@ --------> SECTION CRITIQUE <-------- @@@
490 UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
494 // Le fils est termine, on sort de la boucle et du if englobant
495 // @@@ --------> SECTION CRITIQUE <-------- @@@
496 pthread_mutex_lock(&_bm._threads_mutex);
497 _bm._threads[id].status = DONE;
498 _bm._threads[id].param[STATE] = "Done";
499 pthread_mutex_unlock(&_bm._threads_mutex);
500 // @@@ --------> SECTION CRITIQUE <-------- @@@
501 UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
505 else if (child_wait_rc == -1) {
506 // Le fils a disparu ...
507 // @@@ --------> SECTION CRITIQUE <-------- @@@
508 pthread_mutex_lock(&_bm._threads_mutex);
509 _bm._threads[id].status = DEAD;
510 _bm._threads[id].param[STATE] = "Dead";
511 pthread_mutex_unlock(&_bm._threads_mutex);
512 // @@@ --------> SECTION CRITIQUE <-------- @@@
513 UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
519 // On teste si le thread doit etre detruit
520 pthread_testcancel();
524 // On regarde si le fils n'a pas depasse son temps (wallclock time)
525 time_t child_currenttime = time(NULL);
526 time_t child_elapsedtime = child_currenttime - child_starttime;
527 if (param.find(MAXWALLTIME) != param.end()) {
528 int maxwalltime = param[MAXWALLTIME];
529 // cout << "child_starttime = " << child_starttime << endl
530 // << "child_currenttime = " << child_currenttime << endl
531 // << "child_elapsedtime = " << child_elapsedtime << endl
532 // << "maxwalltime = " << maxwalltime << endl
533 // << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
534 if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
535 UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
536 // On introduit une commande dans la queue du thread
537 // @@@ --------> SECTION CRITIQUE <-------- @@@
538 pthread_mutex_lock(&_bm._threads_mutex);
539 if (_bm._threads.find(id) != _bm._threads.end())
540 _bm._threads[id].command_queue.push(KILL);
541 pthread_mutex_unlock(&_bm._threads_mutex);
542 // @@@ --------> SECTION CRITIQUE <-------- @@@
545 } else if (child_elapsedtime > maxwalltime ) {
546 UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
547 // On introduit une commande dans la queue du thread
548 // @@@ --------> SECTION CRITIQUE <-------- @@@
549 pthread_mutex_lock(&_bm._threads_mutex);
550 if (_bm._threads.find(id) != _bm._threads.end())
551 _bm._threads[id].command_queue.push(TERM);
552 pthread_mutex_unlock(&_bm._threads_mutex);
553 // @@@ --------> SECTION CRITIQUE <-------- @@@
559 // On regarde s'il y a quelque chose a faire dans la queue de commande
560 // @@@ --------> SECTION CRITIQUE <-------- @@@
561 pthread_mutex_lock(&_bm._threads_mutex);
562 if (_bm._threads.find(id) != _bm._threads.end()) {
563 while (_bm._threads[id].command_queue.size() > 0) {
564 Commande cmd = _bm._threads[id].command_queue.front();
565 _bm._threads[id].command_queue.pop();
569 UNDER_LOCK( cout << "Father does nothing to his child" << endl );
573 UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
574 kill(child, SIGSTOP);
578 UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
579 kill(child, SIGCONT);
583 UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
584 kill(child, SIGTERM);
588 UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
589 kill(child, SIGKILL);
601 pthread_mutex_unlock(&_bm._threads_mutex);
602 // @@@ --------> SECTION CRITIQUE <-------- @@@
604 // On fait une petite pause pour ne pas surcharger inutilement le processeur
615 void BatchManager_Local::ThreadAdapter::fils()
617 Parametre param = _job.getParametre();
618 Parametre::iterator it;
622 // On se place dans le repertoire de travail
623 if ( (it = param.find(WORKDIR)) != param.end() ) {
624 string workdir = static_cast<string>( (*it).second );
625 chdir(workdir.c_str());
631 // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
632 char * execpath = NULL;
633 if (param.find(EXECUTABLE) != param.end()) {
634 string executable = _bm.exec_command(param);
635 execpath = new char [executable.size() + 1];
636 strncpy(execpath, executable.c_str(), executable.size() + 1);
639 string debug_command = execpath;
641 string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
644 if (param.find(ARGUMENTS) != param.end()) {
645 Versatile V = param[ARGUMENTS];
647 argv = new char * [V.size() + 2]; // 1 pour name et 1 pour le NULL terminal
649 argv[0] = new char [name.size() + 1];
650 strncpy(argv[0], name.c_str(), name.size() + 1);
652 debug_command += string(" # ") + argv[0];
655 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++, i++) {
656 StringType argt = * static_cast<StringType *>(*it);
658 argv[i] = new char [arg.size() + 1];
659 strncpy(argv[i], arg.c_str(), arg.size() + 1);
660 debug_command += string(" # ") + argv[i];
663 // assert (i == V.size() + 1)
668 UNDER_LOCK( cout << "*** debug_command = " << debug_command << endl );
672 Environnement env = _job.getEnvironnement();
677 envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
679 for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
680 const string & key = (*it).first;
681 const string & value = (*it).second;
683 oss << key << "=" << value;
684 envp[i] = new char [oss.str().size() + 1];
685 strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
688 // assert (i == env.size())
695 // On positionne les limites systeme imposees au fils
696 if (param.find(MAXCPUTIME) != param.end()) {
697 int maxcputime = param[MAXCPUTIME];
699 limit.rlim_cur = maxcputime;
700 limit.rlim_max = int(maxcputime * 1.1);
701 setrlimit(RLIMIT_CPU, &limit);
704 if (param.find(MAXDISKSIZE) != param.end()) {
705 int maxdisksize = param[MAXDISKSIZE];
707 limit.rlim_cur = maxdisksize * 1024;
708 limit.rlim_max = int(maxdisksize * 1.1) * 1024;
709 setrlimit(RLIMIT_FSIZE, &limit);
712 if (param.find(MAXRAMSIZE) != param.end()) {
713 int maxramsize = param[MAXRAMSIZE];
715 limit.rlim_cur = maxramsize * 1024;
716 limit.rlim_max = int(maxramsize * 1.1) * 1024;
717 setrlimit(RLIMIT_AS, &limit);
722 // On cree une session pour le fils de facon a ce qu'il ne soit pas
723 // detruit lorsque le shell se termine (le shell ouvre une session et
724 // tue tous les process appartenant a la session en quittant)
728 // On ferme les descripteurs de fichiers standards
729 //close(STDIN_FILENO);
730 //close(STDOUT_FILENO);
731 //close(STDERR_FILENO);
734 // On execute la commande du fils
735 execve(execpath, argv, envp);
737 // No need to deallocate since nothing happens after a successful exec
739 // Normalement on ne devrait jamais arriver ici
740 ofstream file_err("error.log");
741 UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
743 } catch (GenericException & e) {
745 std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
755 void BatchManager_Local::kill_child_on_exit(void * p_pid)
757 pid_t child = * static_cast<pid_t *>(p_pid);
760 kill(child, SIGTERM);
762 // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
763 // mais cette option n'est pas implementee pour le moment, car il est
764 // preferable de laisser le process fils se terminer normalement et seul.
768 void BatchManager_Local::delete_on_exit(void * arg)
770 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);