From a6d087d86e82c076e1a94ce549a291c5e743ddb0 Mon Sep 17 00:00:00 2001 From: barate Date: Thu, 15 Apr 2010 14:55:38 +0000 Subject: [PATCH] Fixed another bug with threads in BatchManager_Local: no more segfault when destroying a local batch manager with jobs still running (jobs are now canceled at exit). In BatchManager_Local, jobs are now in "RUNNING" state until the end of file copy and cleanup. --- src/Local/Batch_BatchManager_Local.cxx | 69 +++++++++++++++---------- src/Local/Batch_BatchManager_Local.hxx | 14 ++--- src/Local/Test/Test_Local_RSH.cxx | 5 -- src/Local/Test/Test_Local_SH.cxx | 5 -- src/Local/Test/Test_Local_SSH.cxx | 5 -- src/Python/Test/Test_Python_Local_SH.py | 4 -- 6 files changed, 46 insertions(+), 56 deletions(-) diff --git a/src/Local/Batch_BatchManager_Local.cxx b/src/Local/Batch_BatchManager_Local.cxx index e9797fc..698f08d 100644 --- a/src/Local/Batch_BatchManager_Local.cxx +++ b/src/Local/Batch_BatchManager_Local.cxx @@ -67,14 +67,25 @@ namespace Batch { _idCounter(0) { pthread_mutex_init(&_threads_mutex, NULL); - pthread_cond_init(&_threadLaunchCondition, NULL); + pthread_cond_init(&_threadSyncCondition, NULL); } // Destructeur BatchManager_Local::~BatchManager_Local() { + for (map::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) { + pthread_mutex_lock(&_threads_mutex); + string state = iter->second.param[STATE]; + if (state != FINISHED && state != FAILED) { + UNDER_LOCK( cout << "Warning: Job " << iter->first << + " is not finished, it will now be canceled." << endl ); + pthread_cancel(iter->second.thread_id); + pthread_cond_wait(&_threadSyncCondition, &_threads_mutex); + } + pthread_mutex_unlock(&_threads_mutex); + } pthread_mutex_destroy(&_threads_mutex); - pthread_cond_destroy(&_threadLaunchCondition); + pthread_cond_destroy(&_threadSyncCondition); } const CommunicationProtocol & BatchManager_Local::getProtocol() const @@ -110,7 +121,7 @@ namespace Batch { throw RunTimeException("Can't create new thread in BatchManager_Local"); } - pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex); + pthread_cond_wait(&_threadSyncCondition, &_threads_mutex); pthread_mutex_unlock(&_threads_mutex); ostringstream id_sst; @@ -233,23 +244,24 @@ namespace Batch { istringstream iss(jobid.getReference()); iss >> id; - Status status; - //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl ); // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_threads_mutex); - status = _threads[id].status; + bool running = (_threads[id].param[STATE].str() == RUNNING); pthread_mutex_unlock(&_threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl ); - return (status == RUNNING); + return running; } // Methode de destruction d'un job void BatchManager_Local::cancel(pthread_t thread_id) { + pthread_mutex_lock(&_threads_mutex); pthread_cancel(thread_id); + pthread_cond_wait(&_threadSyncCondition, &_threads_mutex); + pthread_mutex_unlock(&_threads_mutex); } @@ -317,6 +329,8 @@ namespace Batch { // Methode d'execution du thread void * BatchManager_Local::ThreadAdapter::run(void * arg) { + ThreadAdapter * p_ta = static_cast(arg); + #ifndef WIN32 // On bloque tous les signaux pour ce thread sigset_t setmask; @@ -333,13 +347,10 @@ namespace Batch { // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation // sera prise en compte par pthread_testcancel() Process child; + pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg); pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast (&child)); pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg); - ThreadAdapter * p_ta = static_cast(arg); - - - // Le code retour cumule (ORed) de tous les appels // Nul en cas de reussite de l'ensemble des operations @@ -467,7 +478,10 @@ namespace Batch { } } + pthread_mutex_lock(&p_ta->_bm._threads_mutex); + // Set the job state to FINISHED or FAILED + p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED; // On retire la fonction de nettoyage de la memoire pthread_cleanup_pop(0); @@ -475,7 +489,10 @@ namespace Batch { // On retire la fonction de suppression du fils pthread_cleanup_pop(0); + // remove setFailedOnCancel function from cancel stack + pthread_cleanup_pop(0); + pthread_mutex_unlock(&p_ta->_bm._threads_mutex); // On invoque la fonction de nettoyage de la memoire delete_on_exit(arg); @@ -510,14 +527,14 @@ namespace Batch { #ifndef WIN32 _bm._threads[_id].pid = child; #endif - _bm._threads[_id].status = RUNNING; + _bm._threads[_id].hasFailed = false; _bm._threads[_id].param = param; _bm._threads[_id].env = env; _bm._threads[_id].command_queue.push(NOP); // Unlock the master thread. From here, all shared variables must be protected // from concurrent access - pthread_cond_signal(&_bm._threadLaunchCondition); + pthread_cond_signal(&_bm._threadSyncCondition); // on boucle en attendant que le fils ait termine @@ -526,11 +543,6 @@ namespace Batch { DWORD exitCode; BOOL res = GetExitCodeProcess(child, &exitCode); if (exitCode != STILL_ACTIVE) { - pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[_id].status = DONE; - _bm._threads[_id].param[STATE] = Batch::FINISHED; - pthread_mutex_unlock(&_bm._threads_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl ); break; } @@ -557,7 +569,6 @@ namespace Batch { // Le fils est simplement stoppe // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[_id].status = STOPPED; _bm._threads[_id].param[STATE] = Batch::PAUSED; pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ @@ -566,12 +577,6 @@ namespace Batch { } else { // Le fils est termine, on sort de la boucle et du if englobant - // @@@ --------> SECTION CRITIQUE <-------- @@@ - pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[_id].status = DONE; - _bm._threads[_id].param[STATE] = Batch::FINISHED; - pthread_mutex_unlock(&_bm._threads_mutex); - // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl ); break; } @@ -580,8 +585,7 @@ namespace Batch { // Le fils a disparu ... // @@@ --------> SECTION CRITIQUE <-------- @@@ pthread_mutex_lock(&_bm._threads_mutex); - _bm._threads[_id].status = DEAD; - _bm._threads[_id].param[STATE] = Batch::FAILED; + _bm._threads[_id].hasFailed = true; pthread_mutex_unlock(&_bm._threads_mutex); // @@@ --------> SECTION CRITIQUE <-------- @@@ UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl ); @@ -909,4 +913,15 @@ namespace Batch { delete p_ta; } + void BatchManager_Local::setFailedOnCancel(void * arg) + { + ThreadAdapter * p_ta = static_cast(arg); + pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex); + p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED; + pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex); + + // Unlock the master thread. From here, the batch manager instance should not be used. + pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition); + } + } diff --git a/src/Local/Batch_BatchManager_Local.hxx b/src/Local/Batch_BatchManager_Local.hxx index aed1fa5..6f27b2a 100644 --- a/src/Local/Batch_BatchManager_Local.hxx +++ b/src/Local/Batch_BatchManager_Local.hxx @@ -77,6 +77,7 @@ namespace Batch { ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id); static void * run(void * arg); BatchManager_Local & getBatchManager() const { return _bm; }; + Id getId() const { return _id; }; protected: BatchManager_Local & _bm; @@ -102,20 +103,12 @@ namespace Batch { ALTER }; - enum Status { - UNKNOWN = 0, - RUNNING, - STOPPED, - DONE, - DEAD - }; - struct Child { pthread_t thread_id; std::queue > command_queue; pid_t pid; int exit_code; - Status status; + bool hasFailed; Parametre param; Environnement env; }; @@ -164,7 +157,8 @@ namespace Batch { virtual void cancel(pthread_t thread_id); static void kill_child_on_exit(void * p_pid); static void delete_on_exit(void * arg); - pthread_cond_t _threadLaunchCondition; + static void setFailedOnCancel(void * arg); + pthread_cond_t _threadSyncCondition; Id _idCounter; #ifdef SWIG diff --git a/src/Local/Test/Test_Local_RSH.cxx b/src/Local/Test/Test_Local_RSH.cxx index 8120e42..700daf5 100644 --- a/src/Local/Test/Test_Local_RSH.cxx +++ b/src/Local/Test/Test_Local_RSH.cxx @@ -64,7 +64,6 @@ int main(int argc, char** argv) const string & exechost = parser.getValue("TEST_LOCAL_RSH_EXECUTION_HOST"); const string & user = parser.getValue("TEST_LOCAL_RSH_USER"); int timeout = parser.getValueAsInt("TEST_LOCAL_RSH_TIMEOUT"); - int finalizationTime = parser.getValueAsInt("TEST_LOCAL_RSH_FINALIZATION_TIME"); // Define the job... Job job; @@ -116,10 +115,6 @@ int main(int argc, char** argv) cout << "Job " << jobid.__repr__() << " is done" << endl; - // wait for the copy of output files and the cleanup - // (there's no cleaner way to do that yet) - sleep(finalizationTime); - } catch (GenericException e) { cerr << "Error: " << e << endl; return 1; diff --git a/src/Local/Test/Test_Local_SH.cxx b/src/Local/Test/Test_Local_SH.cxx index f7ed8e4..bcac2bf 100644 --- a/src/Local/Test/Test_Local_SH.cxx +++ b/src/Local/Test/Test_Local_SH.cxx @@ -63,7 +63,6 @@ int main(int argc, char** argv) parser.parseTestConfigFile(); const string & workdir = parser.getValue("TEST_LOCAL_SH_WORK_DIR"); int timeout = parser.getValueAsInt("TEST_LOCAL_SH_TIMEOUT"); - int finalizationTime = parser.getValueAsInt("TEST_LOCAL_SH_FINALIZATION_TIME"); // Define the job... Job job; @@ -116,10 +115,6 @@ int main(int argc, char** argv) cout << "Job " << jobid.__repr__() << " is done" << endl; - // wait for the copy of output files and the cleanup - // (there's no cleaner way to do that yet) - sleep(finalizationTime); - } catch (GenericException e) { cerr << "Error: " << e << endl; return 1; diff --git a/src/Local/Test/Test_Local_SSH.cxx b/src/Local/Test/Test_Local_SSH.cxx index d216954..9db8917 100644 --- a/src/Local/Test/Test_Local_SSH.cxx +++ b/src/Local/Test/Test_Local_SSH.cxx @@ -65,7 +65,6 @@ int main(int argc, char** argv) const string & exechost = parser.getValue("TEST_LOCAL_SSH_EXECUTION_HOST"); const string & user = parser.getValue("TEST_LOCAL_SSH_USER"); int timeout = parser.getValueAsInt("TEST_LOCAL_SSH_TIMEOUT"); - int finalizationTime = parser.getValueAsInt("TEST_LOCAL_SSH_FINALIZATION_TIME"); // Define the job... Job job; @@ -119,10 +118,6 @@ int main(int argc, char** argv) cout << "Job " << jobid.__repr__() << " is done" << endl; - // wait for the copy of output files and the cleanup - // (there's no cleaner way to do that yet) - sleep(finalizationTime); - } catch (GenericException e) { cerr << "Error: " << e << endl; return 1; diff --git a/src/Python/Test/Test_Python_Local_SH.py b/src/Python/Test/Test_Python_Local_SH.py index 4427bab..11194e4 100644 --- a/src/Python/Test/Test_Python_Local_SH.py +++ b/src/Python/Test/Test_Python_Local_SH.py @@ -87,10 +87,6 @@ def work(): print "Job", jobid, "is done" - # wait for the copy of output files and the cleanup - # (there's no cleaner way to do that yet) - time.sleep(config.TEST_LOCAL_SH_FINALIZATION_TIME) - # test the result file exp = "c = 12" f = open('result.txt') -- 2.39.2