X-Git-Url: http://git.salome-platform.org/gitweb/?a=blobdiff_plain;f=src%2FLocal%2FBatch_BatchManager_Local.cxx;h=c7d3f60d0f857065feee0ac6a341f8d4bd9f45c2;hb=57357f17589b34254a01504b081f7b18ac898612;hp=aadbcdce76ddb81e0e0b0565526b1ee68f330873;hpb=44bb5fd4c2d81932971a482ed576d39e33020dcc;p=tools%2Flibbatch.git diff --git a/src/Local/Batch_BatchManager_Local.cxx b/src/Local/Batch_BatchManager_Local.cxx index aadbcdc..c7d3f60 100644 --- a/src/Local/Batch_BatchManager_Local.cxx +++ b/src/Local/Batch_BatchManager_Local.cxx @@ -1,4 +1,4 @@ -// Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE +// Copyright (C) 2007-2011 CEA/DEN, EDF R&D, OPEN CASCADE // // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS @@ -41,7 +41,6 @@ #include #ifdef WIN32 #include -#include "Batch_RunTimeException.hxx" #else #include #include @@ -51,8 +50,11 @@ #include #include #include + +#include "Batch_Constants.hxx" #include "Batch_IOMutex.hxx" #include "Batch_BatchManager_Local.hxx" +#include "Batch_RunTimeException.hxx" using namespace std; @@ -62,21 +64,30 @@ namespace Batch { // Constructeur BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host, CommunicationProtocolType protocolType) - : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), + : BatchManager(parent, host), _connect(0), _protocol(CommunicationProtocol::getInstance(protocolType)), - _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association() + _idCounter(0) { pthread_mutex_init(&_threads_mutex, NULL); - pthread_mutex_init(&_thread_id_id_association_mutex, NULL); - pthread_cond_init(&_thread_id_id_association_cond, NULL); + pthread_cond_init(&_threadSyncCondition, NULL); } // Destructeur BatchManager_Local::~BatchManager_Local() { + for (map::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) { + pthread_mutex_lock(&_threads_mutex); + string state = iter->second.param[STATE]; + if (state != FINISHED && state != FAILED) { + UNDER_LOCK( cout << "Warning: Job " << iter->first << + " is not finished, it will now be canceled." << endl ); + pthread_cancel(iter->second.thread_id); + pthread_cond_wait(&_threadSyncCondition, &_threads_mutex); + } + pthread_mutex_unlock(&_threads_mutex); + } pthread_mutex_destroy(&_threads_mutex); - pthread_mutex_destroy(&_thread_id_id_association_mutex); - pthread_cond_destroy(&_thread_id_id_association_cond); + pthread_cond_destroy(&_threadSyncCondition); } const CommunicationProtocol & BatchManager_Local::getProtocol() const @@ -88,15 +99,36 @@ namespace Batch { const JobId BatchManager_Local::submitJob(const Job & job) { Job_Local jobLocal = job; + Id id = _idCounter++; + ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id); - pthread_t thread_id = submit(jobLocal); + // Les attributs du thread a sa creation + pthread_attr_t thread_attr; + pthread_attr_init(&thread_attr); + pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); - ostringstream oss; - oss << getIdByThread_id(thread_id); + // Creation du thread qui va executer la commande systeme qu'on lui passe + pthread_t thread_id; + pthread_mutex_lock(&_threads_mutex); + int rc = pthread_create(&thread_id, + &thread_attr, + &ThreadAdapter::run, + static_cast(p_ta)); + + // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread + pthread_attr_destroy(&thread_attr); - JobId id(this, oss.str()); + if (rc != 0) { + pthread_mutex_unlock(&_threads_mutex); + throw RunTimeException("Can't create new thread in BatchManager_Local"); + } + + pthread_cond_wait(&_threadSyncCondition, &_threads_mutex); + pthread_mutex_unlock(&_threads_mutex); - return id; + ostringstream id_sst; + id_sst << id; + return JobId(this, id_sst.str()); } // Methode pour le controle des jobs : retire un job du gestionnaire @@ -107,17 +139,24 @@ namespace Batch { istringstream iss(jobid.getReference()); iss >> id; - // On retrouve le thread_id du thread - pthread_t thread_id; - // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_threads_mutex); - if (_threads.find(id) != _threads.end()) - thread_id = _threads[id].thread_id; + bool idFound = (_threads.find(id) != _threads.end()); + if (idFound) { + string state = _threads[id].param[STATE]; + if (state != FINISHED && state != FAILED) { + pthread_cancel(_threads[id].thread_id); + pthread_cond_wait(&_threadSyncCondition, &_threads_mutex); + } else { + cout << "Cannot delete job " << jobid.getReference() << + ". Job is already finished." << endl; + } + } pthread_mutex_unlock(&_threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ - cancel(thread_id); + if (!idFound) + throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist"); } // Methode pour le controle des jobs : suspend un job en file d'attente @@ -189,17 +228,31 @@ namespace Batch { //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl ); // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_threads_mutex); - param = _threads[id].param; - env = _threads[id].env; + std::map::iterator pos = _threads.find(id); + bool found = (pos != _threads.end()); + if (found) { + param = pos->second.param; + env = pos->second.env; + } pthread_mutex_unlock(&_threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl ); + if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob"); + JobInfo_Local ji(param, env); return ji; } + // Ce manager ne peut pas reprendre un job + // On force donc l'état du job à erreur - pour cela on ne donne pas d'Id + // au JobId + const Batch::JobId + BatchManager_Local::addJob(const Batch::Job & job, const std::string reference) + { + return JobId(this, "undefined"); + } // Methode pour le controle des jobs : teste si un job est present en machine bool BatchManager_Local::isRunning(const JobId & jobid) @@ -208,57 +261,15 @@ namespace Batch { istringstream iss(jobid.getReference()); iss >> id; - Status status; - //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl ); // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_threads_mutex); - status = _threads[id].status; + bool running = (_threads[id].param[STATE].str() == RUNNING); pthread_mutex_unlock(&_threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl ); - return (status == RUNNING); - } - - - // Methode d'execution d'un job - pthread_t BatchManager_Local::submit(const Job_Local & job) - { - // L'id du thread a creer - pthread_t thread_id = -#ifdef WIN32 - {0,0}; -#else - 0; -#endif - - // Les attributs du thread a sa creation - pthread_attr_t thread_attr; - pthread_attr_init(&thread_attr); - pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); - - ThreadAdapter * p_ta = new ThreadAdapter(*this, job); - - // Creation du thread qui va executer la commande systeme qu'on lui passe - int rc = pthread_create(&thread_id, - &thread_attr, - &ThreadAdapter::run, - static_cast(p_ta)); - if (rc) { - } - - // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread - pthread_attr_destroy(&thread_attr); - - return thread_id; - } - - - // Methode de destruction d'un job - void BatchManager_Local::cancel(pthread_t thread_id) - { - pthread_cancel(thread_id); + return running; } @@ -283,6 +294,26 @@ namespace Batch { } } + if (param.find(INFILE) != param.end()) { + Versatile V = param[INFILE]; + for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) { + Couple cpl = * static_cast(*it); + string remote = cpl.getRemote(); + if (remote == "stdin") + exec_sub_cmd << " (*it); + string remote = cpl.getRemote(); + if (remote == "stdout") exec_sub_cmd << " 1>stdout"; + if (remote == "stderr") exec_sub_cmd << " 2>stderr"; + } + } + string user; Parametre::const_iterator it = param.find(USER); if (it != param.end()) { @@ -293,79 +324,10 @@ namespace Batch { } - // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique - // au cours du temps (il peut etre reutilise lorsqu'un thread se termine) - // ATTENTION : cette methode est uniquement protegee par la section critique de l'association - // Thread_id / Id (_thread_id_id_association_mutex) - BatchManager_Local::Id BatchManager_Local::nextId() - { - static Id id = 0; - Id nextId = id++; - //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl ); - return nextId; - } - - - // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement - BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) - { - // @@@ --------> SECTION CRITIQUE <-------- @@@ - pthread_mutex_lock(&_thread_id_id_association_mutex); - bool threadIdFound = false; - std::list::iterator it; - while (!threadIdFound) { - for (it = _thread_id_id_association.begin() ; - it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ; - it++); - if (it == _thread_id_id_association.end()) - pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex); - else - threadIdFound = true; - } - - Id id = it->id; - _thread_id_id_association.erase(it); - - pthread_mutex_unlock(&_thread_id_id_association_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ - - //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl ); - return id; - } - - - // Associe un Thread_id a un Id nouvellement cree - BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) - { - Id id = -1; - - // @@@ --------> SECTION CRITIQUE <-------- @@@ - pthread_mutex_lock(&_thread_id_id_association_mutex); - std::list::iterator it; - for (it = _thread_id_id_association.begin() ; - it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ; - it++); - if (it == _thread_id_id_association.end()) { - struct ThreadIdIdAssociation newAssociation; - id = newAssociation.id = nextId(); - newAssociation.threadId = thread_id; - _thread_id_id_association.push_back(newAssociation); - pthread_cond_signal(&_thread_id_id_association_cond); - - } else { - UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl ); - } - pthread_mutex_unlock(&_thread_id_id_association_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ - - //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl ); - return id; - } - // Constructeur de la classe ThreadAdapter - BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) : - _bm(bm), _job(job) + BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) : + _bm(bm), _job(job), _id(id) { // Nothing to do } @@ -375,6 +337,8 @@ namespace Batch { // Methode d'execution du thread void * BatchManager_Local::ThreadAdapter::run(void * arg) { + ThreadAdapter * p_ta = static_cast(arg); + #ifndef WIN32 // On bloque tous les signaux pour ce thread sigset_t setmask; @@ -391,12 +355,9 @@ namespace Batch { // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation // sera prise en compte par pthread_testcancel() Process child; - pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast (&child)); pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg); - - ThreadAdapter * p_ta = static_cast(arg); - - + pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg); + pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast (&child)); // Le code retour cumule (ORed) de tous les appels @@ -525,7 +486,10 @@ namespace Batch { } } + pthread_mutex_lock(&p_ta->_bm._threads_mutex); + // Set the job state to FINISHED or FAILED + p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED; // On retire la fonction de nettoyage de la memoire pthread_cleanup_pop(0); @@ -533,7 +497,10 @@ namespace Batch { // On retire la fonction de suppression du fils pthread_cleanup_pop(0); + // remove setFailedOnCancel function from cancel stack + pthread_cleanup_pop(0); + pthread_mutex_unlock(&p_ta->_bm._threads_mutex); // On invoque la fonction de nettoyage de la memoire delete_on_exit(arg); @@ -552,33 +519,30 @@ namespace Batch { // On enregistre le fils dans la table des threads pthread_t thread_id = pthread_self(); - Id id = _bm.registerThread_id(thread_id); Parametre param = _job.getParametre(); Environnement env = _job.getEnvironnement(); - ostringstream thread_id_sst; - thread_id_sst << id; - param[ID] = thread_id_sst.str(); - param[STATE] = "Running"; + ostringstream id_sst; + id_sst << _id; + param[ID] = id_sst.str(); + param[STATE] = Batch::RUNNING; #ifndef WIN32 param[PID] = child; #endif - // @@@ --------> SECTION CRITIQUE <-------- @@@ - pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].thread_id = thread_id; + _bm._threads[_id].thread_id = thread_id; #ifndef WIN32 - _bm._threads[id].pid = child; + _bm._threads[_id].pid = child; #endif - _bm._threads[id].status = RUNNING; - _bm._threads[id].param = param; - _bm._threads[id].env = env; - _bm._threads[id].command_queue.push(NOP); - pthread_mutex_unlock(&_bm._threads_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ - + _bm._threads[_id].hasFailed = false; + _bm._threads[_id].param = param; + _bm._threads[_id].env = env; + _bm._threads[_id].command_queue.push(NOP); + // Unlock the master thread. From here, all shared variables must be protected + // from concurrent access + pthread_cond_signal(&_bm._threadSyncCondition); // on boucle en attendant que le fils ait termine @@ -587,11 +551,6 @@ namespace Batch { DWORD exitCode; BOOL res = GetExitCodeProcess(child, &exitCode); if (exitCode != STILL_ACTIVE) { - pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].status = DONE; - _bm._threads[id].param[STATE] = "Done"; - pthread_mutex_unlock(&_bm._threads_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl ); break; } @@ -606,7 +565,9 @@ namespace Batch { UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl); UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl); UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl); - UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); +#ifdef WIFCONTINUED + UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge +#endif if (WIFSTOPPED(child_rc)) { // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment @@ -616,8 +577,7 @@ namespace Batch { // Le fils est simplement stoppe // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].status = STOPPED; - _bm._threads[id].param[STATE] = "Stopped"; + _bm._threads[_id].param[STATE] = Batch::PAUSED; pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl ); @@ -625,12 +585,6 @@ namespace Batch { } else { // Le fils est termine, on sort de la boucle et du if englobant - // @@@ --------> SECTION CRITIQUE <-------- @@@ - pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].status = DONE; - _bm._threads[id].param[STATE] = "Done"; - pthread_mutex_unlock(&_bm._threads_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl ); break; } @@ -639,8 +593,7 @@ namespace Batch { // Le fils a disparu ... // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].status = DEAD; - _bm._threads[id].param[STATE] = "Dead"; + _bm._threads[_id].hasFailed = true; pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl ); @@ -655,32 +608,32 @@ namespace Batch { // On regarde si le fils n'a pas depasse son temps (wallclock time) time_t child_currenttime = time(NULL); - time_t child_elapsedtime = child_currenttime - child_starttime; + long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L; if (param.find(MAXWALLTIME) != param.end()) { - int maxwalltime = param[MAXWALLTIME]; + long maxwalltime = param[MAXWALLTIME]; // cout << "child_starttime = " << child_starttime << endl // << "child_currenttime = " << child_currenttime << endl // << "child_elapsedtime = " << child_elapsedtime << endl // << "maxwalltime = " << maxwalltime << endl // << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl; - if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL - UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl ); + if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL + UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl ); // On introduit une commande dans la queue du thread // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - if (_bm._threads.find(id) != _bm._threads.end()) - _bm._threads[id].command_queue.push(KILL); + if (_bm._threads.find(_id) != _bm._threads.end()) + _bm._threads[_id].command_queue.push(KILL); pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ - } else if (child_elapsedtime > maxwalltime ) { - UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl ); + } else if (child_elapsedtime_minutes > maxwalltime ) { + UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl ); // On introduit une commande dans la queue du thread // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - if (_bm._threads.find(id) != _bm._threads.end()) - _bm._threads[id].command_queue.push(TERM); + if (_bm._threads.find(_id) != _bm._threads.end()) + _bm._threads[_id].command_queue.push(TERM); pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ } @@ -691,10 +644,10 @@ namespace Batch { // On regarde s'il y a quelque chose a faire dans la queue de commande // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - if (_bm._threads.find(id) != _bm._threads.end()) { - while (_bm._threads[id].command_queue.size() > 0) { - Commande cmd = _bm._threads[id].command_queue.front(); - _bm._threads[id].command_queue.pop(); + if (_bm._threads.find(_id) != _bm._threads.end()) { + while (_bm._threads[_id].command_queue.size() > 0) { + Commande cmd = _bm._threads[_id].command_queue.front(); + _bm._threads[_id].command_queue.pop(); switch (cmd) { case NOP: @@ -829,8 +782,8 @@ namespace Batch { if (param.find(MAXRAMSIZE) != param.end()) { int maxramsize = param[MAXRAMSIZE]; struct rlimit limit; - limit.rlim_cur = maxramsize * 1024; - limit.rlim_max = int(maxramsize * 1.1) * 1024; + limit.rlim_cur = maxramsize * 1024 * 1024; + limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024; setrlimit(RLIMIT_AS, &limit); } @@ -853,7 +806,7 @@ namespace Batch { // On execute la commande du fils - int result = execve(argv[0], argv, envp); + execve(argv[0], argv, envp); UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl ); // No need to deallocate since nothing happens after a successful exec @@ -968,4 +921,15 @@ namespace Batch { delete p_ta; } + void BatchManager_Local::setFailedOnCancel(void * arg) + { + ThreadAdapter * p_ta = static_cast(arg); + pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex); + p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED; + pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex); + + // Unlock the master thread. From here, the batch manager instance should not be used. + pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition); + } + }