From 639641b8600eb1d52265aa777da64ebffbe961df Mon Sep 17 00:00:00 2001 From: barate Date: Wed, 7 Apr 2010 13:56:41 +0000 Subject: [PATCH] Fixed bug with threads in BatchManager_Local. --- src/Local/Batch_BatchManager_Local.cxx | 223 ++++++++----------------- src/Local/Batch_BatchManager_Local.hxx | 24 +-- 2 files changed, 80 insertions(+), 167 deletions(-) diff --git a/src/Local/Batch_BatchManager_Local.cxx b/src/Local/Batch_BatchManager_Local.cxx index 37ea02f..5cc3b23 100644 --- a/src/Local/Batch_BatchManager_Local.cxx +++ b/src/Local/Batch_BatchManager_Local.cxx @@ -41,7 +41,6 @@ #include #ifdef WIN32 #include -#include "Batch_RunTimeException.hxx" #else #include #include @@ -53,6 +52,7 @@ #include #include "Batch_IOMutex.hxx" #include "Batch_BatchManager_Local.hxx" +#include "Batch_RunTimeException.hxx" using namespace std; @@ -62,21 +62,19 @@ namespace Batch { // Constructeur BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host, CommunicationProtocolType protocolType) - : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), + : BatchManager(parent, host), _connect(0), _protocol(CommunicationProtocol::getInstance(protocolType)), - _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association() + _idCounter(0) { pthread_mutex_init(&_threads_mutex, NULL); - pthread_mutex_init(&_thread_id_id_association_mutex, NULL); - pthread_cond_init(&_thread_id_id_association_cond, NULL); + pthread_cond_init(&_threadLaunchCondition, NULL); } // Destructeur BatchManager_Local::~BatchManager_Local() { pthread_mutex_destroy(&_threads_mutex); - pthread_mutex_destroy(&_thread_id_id_association_mutex); - pthread_cond_destroy(&_thread_id_id_association_cond); + pthread_cond_destroy(&_threadLaunchCondition); } const CommunicationProtocol & BatchManager_Local::getProtocol() const @@ -88,15 +86,36 @@ namespace Batch { const JobId BatchManager_Local::submitJob(const Job & job) { Job_Local jobLocal = job; + Id id = _idCounter++; + ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id); + + // Les attributs du thread a sa creation + pthread_attr_t thread_attr; + pthread_attr_init(&thread_attr); + pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); + + // Creation du thread qui va executer la commande systeme qu'on lui passe + pthread_t thread_id; + pthread_mutex_lock(&_threads_mutex); + int rc = pthread_create(&thread_id, + &thread_attr, + &ThreadAdapter::run, + static_cast(p_ta)); - pthread_t thread_id = submit(jobLocal); + // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread + pthread_attr_destroy(&thread_attr); - ostringstream oss; - oss << getIdByThread_id(thread_id); + if (rc != 0) { + pthread_mutex_unlock(&_threads_mutex); + throw RunTimeException("Can't create new thread in BatchManager_Local"); + } - JobId id(this, oss.str()); + pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex); + pthread_mutex_unlock(&_threads_mutex); - return id; + ostringstream id_sst; + id_sst << id; + return JobId(this, id_sst.str()); } // Methode pour le controle des jobs : retire un job du gestionnaire @@ -189,12 +208,18 @@ namespace Batch { //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl ); // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_threads_mutex); - param = _threads[id].param; - env = _threads[id].env; + std::map::iterator pos = _threads.find(id); + bool found = (pos != _threads.end()); + if (found) { + param = pos->second.param; + env = pos->second.env; + } pthread_mutex_unlock(&_threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl ); + if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob"); + JobInfo_Local ji(param, env); return ji; } @@ -221,40 +246,6 @@ namespace Batch { return (status == RUNNING); } - - // Methode d'execution d'un job - pthread_t BatchManager_Local::submit(const Job_Local & job) - { - // L'id du thread a creer - pthread_t thread_id = -#ifdef WIN32 - {0,0}; -#else - 0; -#endif - - // Les attributs du thread a sa creation - pthread_attr_t thread_attr; - pthread_attr_init(&thread_attr); - pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); - - ThreadAdapter * p_ta = new ThreadAdapter(*this, job); - - // Creation du thread qui va executer la commande systeme qu'on lui passe - int rc = pthread_create(&thread_id, - &thread_attr, - &ThreadAdapter::run, - static_cast(p_ta)); - if (rc) { - } - - // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread - pthread_attr_destroy(&thread_attr); - - return thread_id; - } - - // Methode de destruction d'un job void BatchManager_Local::cancel(pthread_t thread_id) { @@ -293,79 +284,10 @@ namespace Batch { } - // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique - // au cours du temps (il peut etre reutilise lorsqu'un thread se termine) - // ATTENTION : cette methode est uniquement protegee par la section critique de l'association - // Thread_id / Id (_thread_id_id_association_mutex) - BatchManager_Local::Id BatchManager_Local::nextId() - { - static Id id = 0; - Id nextId = id++; - //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl ); - return nextId; - } - - - // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement - BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) - { - // @@@ --------> SECTION CRITIQUE <-------- @@@ - pthread_mutex_lock(&_thread_id_id_association_mutex); - bool threadIdFound = false; - std::list::iterator it; - while (!threadIdFound) { - for (it = _thread_id_id_association.begin() ; - it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ; - it++); - if (it == _thread_id_id_association.end()) - pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex); - else - threadIdFound = true; - } - - Id id = it->id; - _thread_id_id_association.erase(it); - - pthread_mutex_unlock(&_thread_id_id_association_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ - - //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl ); - return id; - } - - - // Associe un Thread_id a un Id nouvellement cree - BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) - { - Id id = -1; - - // @@@ --------> SECTION CRITIQUE <-------- @@@ - pthread_mutex_lock(&_thread_id_id_association_mutex); - std::list::iterator it; - for (it = _thread_id_id_association.begin() ; - it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ; - it++); - if (it == _thread_id_id_association.end()) { - struct ThreadIdIdAssociation newAssociation; - id = newAssociation.id = nextId(); - newAssociation.threadId = thread_id; - _thread_id_id_association.push_back(newAssociation); - pthread_cond_signal(&_thread_id_id_association_cond); - - } else { - UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl ); - } - pthread_mutex_unlock(&_thread_id_id_association_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ - - //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl ); - return id; - } - // Constructeur de la classe ThreadAdapter - BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) : - _bm(bm), _job(job) + BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) : + _bm(bm), _job(job), _id(id) { // Nothing to do } @@ -552,33 +474,30 @@ namespace Batch { // On enregistre le fils dans la table des threads pthread_t thread_id = pthread_self(); - Id id = _bm.registerThread_id(thread_id); Parametre param = _job.getParametre(); Environnement env = _job.getEnvironnement(); - ostringstream thread_id_sst; - thread_id_sst << id; - param[ID] = thread_id_sst.str(); + ostringstream id_sst; + id_sst << _id; + param[ID] = id_sst.str(); param[STATE] = Batch::RUNNING; #ifndef WIN32 param[PID] = child; #endif - // @@@ --------> SECTION CRITIQUE <-------- @@@ - pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].thread_id = thread_id; + _bm._threads[_id].thread_id = thread_id; #ifndef WIN32 - _bm._threads[id].pid = child; + _bm._threads[_id].pid = child; #endif - _bm._threads[id].status = RUNNING; - _bm._threads[id].param = param; - _bm._threads[id].env = env; - _bm._threads[id].command_queue.push(NOP); - pthread_mutex_unlock(&_bm._threads_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ - + _bm._threads[_id].status = RUNNING; + _bm._threads[_id].param = param; + _bm._threads[_id].env = env; + _bm._threads[_id].command_queue.push(NOP); + // Unlock the master thread. From here, all shared variables must be protected + // from concurrent access + pthread_cond_signal(&_bm._threadLaunchCondition); // on boucle en attendant que le fils ait termine @@ -588,8 +507,8 @@ namespace Batch { BOOL res = GetExitCodeProcess(child, &exitCode); if (exitCode != STILL_ACTIVE) { pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].status = DONE; - _bm._threads[id].param[STATE] = Batch::FINISHED; + _bm._threads[_id].status = DONE; + _bm._threads[_id].param[STATE] = Batch::FINISHED; pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl ); @@ -618,8 +537,8 @@ namespace Batch { // Le fils est simplement stoppe // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].status = STOPPED; - _bm._threads[id].param[STATE] = Batch::PAUSED; + _bm._threads[_id].status = STOPPED; + _bm._threads[_id].param[STATE] = Batch::PAUSED; pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl ); @@ -629,8 +548,8 @@ namespace Batch { // Le fils est termine, on sort de la boucle et du if englobant // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].status = DONE; - _bm._threads[id].param[STATE] = Batch::FINISHED; + _bm._threads[_id].status = DONE; + _bm._threads[_id].param[STATE] = Batch::FINISHED; pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl ); @@ -641,8 +560,8 @@ namespace Batch { // Le fils a disparu ... // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[id].status = DEAD; - _bm._threads[id].param[STATE] = Batch::FAILED; + _bm._threads[_id].status = DEAD; + _bm._threads[_id].param[STATE] = Batch::FAILED; pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl ); @@ -666,23 +585,23 @@ namespace Batch { // << "maxwalltime = " << maxwalltime << endl // << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl; if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL - UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl ); + UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl ); // On introduit une commande dans la queue du thread // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - if (_bm._threads.find(id) != _bm._threads.end()) - _bm._threads[id].command_queue.push(KILL); + if (_bm._threads.find(_id) != _bm._threads.end()) + _bm._threads[_id].command_queue.push(KILL); pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ } else if (child_elapsedtime > maxwalltime ) { - UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl ); + UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl ); // On introduit une commande dans la queue du thread // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - if (_bm._threads.find(id) != _bm._threads.end()) - _bm._threads[id].command_queue.push(TERM); + if (_bm._threads.find(_id) != _bm._threads.end()) + _bm._threads[_id].command_queue.push(TERM); pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ } @@ -693,10 +612,10 @@ namespace Batch { // On regarde s'il y a quelque chose a faire dans la queue de commande // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - if (_bm._threads.find(id) != _bm._threads.end()) { - while (_bm._threads[id].command_queue.size() > 0) { - Commande cmd = _bm._threads[id].command_queue.front(); - _bm._threads[id].command_queue.pop(); + if (_bm._threads.find(_id) != _bm._threads.end()) { + while (_bm._threads[_id].command_queue.size() > 0) { + Commande cmd = _bm._threads[_id].command_queue.front(); + _bm._threads[_id].command_queue.pop(); switch (cmd) { case NOP: diff --git a/src/Local/Batch_BatchManager_Local.hxx b/src/Local/Batch_BatchManager_Local.hxx index bae6592..aed1fa5 100644 --- a/src/Local/Batch_BatchManager_Local.hxx +++ b/src/Local/Batch_BatchManager_Local.hxx @@ -58,9 +58,14 @@ namespace Batch { class FactBatchManager; + /*! + * This class defines a local pseudo batch manager that can launch jobs locally or on a remote + * machine with SSH or RSH. This class is NOT thread-safe. + */ class BATCH_EXPORT BatchManager_Local : virtual public BatchManager { private: + typedef int Id; #ifdef WIN32 typedef HANDLE Process; #else @@ -69,13 +74,14 @@ namespace Batch { friend class ThreadAdapter; class ThreadAdapter{ public: - ThreadAdapter(BatchManager_Local & bm, const Job_Local & job); + ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id); static void * run(void * arg); BatchManager_Local & getBatchManager() const { return _bm; }; protected: BatchManager_Local & _bm; const Job_Local _job; + Id _id; private: void pere(Process child); @@ -87,8 +93,6 @@ namespace Batch { }; - typedef int Id; - enum Commande { NOP = 0, HOLD, @@ -157,21 +161,11 @@ namespace Batch { std::vector exec_command(const Parametre & param) const; private: - struct ThreadIdIdAssociation { - pthread_t threadId; - Id id; - }; - - virtual pthread_t submit(const Job_Local & job); virtual void cancel(pthread_t thread_id); static void kill_child_on_exit(void * p_pid); static void delete_on_exit(void * arg); - Id nextId(); // Retourne un identifiant unique pour un thread (clef de la map) - Id getIdByThread_id(pthread_t thread_id); - Id registerThread_id(pthread_t thread_id); - pthread_mutex_t _thread_id_id_association_mutex; - pthread_cond_t _thread_id_id_association_cond; - std::list _thread_id_id_association; + pthread_cond_t _threadLaunchCondition; + Id _idCounter; #ifdef SWIG public: -- 2.39.2