Salome HOME
Fixed another bug with threads in BatchManager_Local: no more segfault when destroyin...
authorbarate <barate>
Thu, 15 Apr 2010 14:55:38 +0000 (14:55 +0000)
committerbarate <barate>
Thu, 15 Apr 2010 14:55:38 +0000 (14:55 +0000)
In BatchManager_Local, jobs are now in "RUNNING" state until the end of file copy and cleanup.

src/Local/Batch_BatchManager_Local.cxx
src/Local/Batch_BatchManager_Local.hxx
src/Local/Test/Test_Local_RSH.cxx
src/Local/Test/Test_Local_SH.cxx
src/Local/Test/Test_Local_SSH.cxx
src/Python/Test/Test_Python_Local_SH.py

index e9797fc8e627021b0f161e21de0a9e2d6a5e4519..698f08de76178067c41269e08ca810804de62649 100644 (file)
@@ -67,14 +67,25 @@ namespace Batch {
       _idCounter(0)
   {
     pthread_mutex_init(&_threads_mutex, NULL);
-    pthread_cond_init(&_threadLaunchCondition, NULL);
+    pthread_cond_init(&_threadSyncCondition, NULL);
   }
 
   // Destructeur
   BatchManager_Local::~BatchManager_Local()
   {
+    for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
+      pthread_mutex_lock(&_threads_mutex);
+      string state = iter->second.param[STATE];
+      if (state != FINISHED && state != FAILED) {
+        UNDER_LOCK( cout << "Warning: Job " << iter->first <<
+                            " is not finished, it will now be canceled." << endl );
+        pthread_cancel(iter->second.thread_id);
+        pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+      }
+      pthread_mutex_unlock(&_threads_mutex);
+    }
     pthread_mutex_destroy(&_threads_mutex);
-    pthread_cond_destroy(&_threadLaunchCondition);
+    pthread_cond_destroy(&_threadSyncCondition);
   }
 
   const CommunicationProtocol & BatchManager_Local::getProtocol() const
@@ -110,7 +121,7 @@ namespace Batch {
       throw RunTimeException("Can't create new thread in BatchManager_Local");
     }
 
-    pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex);
+    pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
     pthread_mutex_unlock(&_threads_mutex);
 
     ostringstream id_sst;
@@ -233,23 +244,24 @@ namespace Batch {
     istringstream iss(jobid.getReference());
     iss >> id;
 
-    Status status;
-
     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
     // @@@ --------> SECTION CRITIQUE <-------- @@@
     pthread_mutex_lock(&_threads_mutex);
-    status = _threads[id].status;
+    bool running = (_threads[id].param[STATE].str() == RUNNING);
     pthread_mutex_unlock(&_threads_mutex);
     // @@@ --------> SECTION CRITIQUE <-------- @@@
     //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
 
-    return (status == RUNNING);
+    return running;
   }
 
   // Methode de destruction d'un job
   void BatchManager_Local::cancel(pthread_t thread_id)
   {
+    pthread_mutex_lock(&_threads_mutex);
     pthread_cancel(thread_id);
+    pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+    pthread_mutex_unlock(&_threads_mutex);
   }
 
 
@@ -317,6 +329,8 @@ namespace Batch {
   // Methode d'execution du thread
   void * BatchManager_Local::ThreadAdapter::run(void * arg)
   {
+    ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+
 #ifndef WIN32
     // On bloque tous les signaux pour ce thread
     sigset_t setmask;
@@ -333,13 +347,10 @@ namespace Batch {
     // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
     // sera prise en compte par pthread_testcancel()
     Process child;
+    pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
     pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
     pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
 
-    ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
-
-
-
 
     // Le code retour cumule (ORed) de tous les appels
     // Nul en cas de reussite de l'ensemble des operations
@@ -467,7 +478,10 @@ namespace Batch {
       }
     }
 
+    pthread_mutex_lock(&p_ta->_bm._threads_mutex);
 
+    // Set the job state to FINISHED or FAILED
+    p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
 
     // On retire la fonction de nettoyage de la memoire
     pthread_cleanup_pop(0);
@@ -475,7 +489,10 @@ namespace Batch {
     // On retire la fonction de suppression du fils
     pthread_cleanup_pop(0);
 
+    // remove setFailedOnCancel function from cancel stack
+    pthread_cleanup_pop(0);
 
+    pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
 
     // On invoque la fonction de nettoyage de la memoire
     delete_on_exit(arg);
@@ -510,14 +527,14 @@ namespace Batch {
 #ifndef WIN32
     _bm._threads[_id].pid       = child;
 #endif
-    _bm._threads[_id].status    = RUNNING;
+    _bm._threads[_id].hasFailed = false;
     _bm._threads[_id].param     = param;
     _bm._threads[_id].env       = env;
     _bm._threads[_id].command_queue.push(NOP);
 
     // Unlock the master thread. From here, all shared variables must be protected
     // from concurrent access
-    pthread_cond_signal(&_bm._threadLaunchCondition);
+    pthread_cond_signal(&_bm._threadSyncCondition);
 
 
     // on boucle en attendant que le fils ait termine
@@ -526,11 +543,6 @@ namespace Batch {
       DWORD exitCode;
       BOOL res = GetExitCodeProcess(child, &exitCode);
       if (exitCode != STILL_ACTIVE) {
-        pthread_mutex_lock(&_bm._threads_mutex);
-        _bm._threads[_id].status       = DONE;
-        _bm._threads[_id].param[STATE] = Batch::FINISHED;
-        pthread_mutex_unlock(&_bm._threads_mutex);
-        // @@@ --------> SECTION CRITIQUE <-------- @@@
         UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
         break;
       }
@@ -557,7 +569,6 @@ namespace Batch {
           // Le fils est simplement stoppe
           // @@@ --------> SECTION CRITIQUE <-------- @@@
           pthread_mutex_lock(&_bm._threads_mutex);
-          _bm._threads[_id].status       = STOPPED;
           _bm._threads[_id].param[STATE] = Batch::PAUSED;
           pthread_mutex_unlock(&_bm._threads_mutex);
           // @@@ --------> SECTION CRITIQUE <-------- @@@
@@ -566,12 +577,6 @@ namespace Batch {
         }
         else {
           // Le fils est termine, on sort de la boucle et du if englobant
-          // @@@ --------> SECTION CRITIQUE <-------- @@@
-          pthread_mutex_lock(&_bm._threads_mutex);
-          _bm._threads[_id].status       = DONE;
-          _bm._threads[_id].param[STATE] = Batch::FINISHED;
-          pthread_mutex_unlock(&_bm._threads_mutex);
-          // @@@ --------> SECTION CRITIQUE <-------- @@@
           UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
           break;
         }
@@ -580,8 +585,7 @@ namespace Batch {
         // Le fils a disparu ...
         // @@@ --------> SECTION CRITIQUE <-------- @@@
         pthread_mutex_lock(&_bm._threads_mutex);
-        _bm._threads[_id].status       = DEAD;
-        _bm._threads[_id].param[STATE] = Batch::FAILED;
+        _bm._threads[_id].hasFailed = true;
         pthread_mutex_unlock(&_bm._threads_mutex);
         // @@@ --------> SECTION CRITIQUE <-------- @@@
         UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
@@ -909,4 +913,15 @@ namespace Batch {
     delete p_ta;
   }
 
+  void BatchManager_Local::setFailedOnCancel(void * arg)
+  {
+    ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+    pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
+    p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
+    pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
+
+    // Unlock the master thread. From here, the batch manager instance should not be used.
+    pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
+  }
+
 }
index aed1fa5d4d27089e40de4e192075918ebc56d957..6f27b2a328e9d2d621f39b6245533d1433ea3d9f 100644 (file)
@@ -77,6 +77,7 @@ namespace Batch {
       ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id);
       static void * run(void * arg);
       BatchManager_Local & getBatchManager() const { return _bm; };
+      Id getId() const { return _id; };
 
     protected:
       BatchManager_Local & _bm;
@@ -102,20 +103,12 @@ namespace Batch {
       ALTER
     };
 
-    enum Status {
-      UNKNOWN = 0,
-      RUNNING,
-      STOPPED,
-      DONE,
-      DEAD
-    };
-
     struct Child {
       pthread_t thread_id;
       std::queue<Commande, std::deque<Commande> > command_queue;
       pid_t pid;
       int exit_code;
-      Status status;
+      bool hasFailed;
       Parametre param;
       Environnement env;
     };
@@ -164,7 +157,8 @@ namespace Batch {
     virtual void cancel(pthread_t thread_id);
     static  void kill_child_on_exit(void * p_pid);
     static  void delete_on_exit(void * arg);
-    pthread_cond_t _threadLaunchCondition;
+    static void setFailedOnCancel(void * arg);
+    pthread_cond_t _threadSyncCondition;
     Id _idCounter;
 
 #ifdef SWIG
index 8120e426e9b91143560a622c29ab33bb7c6dedae..700daf57d6927c9549f311147a5a160f92fec388 100644 (file)
@@ -64,7 +64,6 @@ int main(int argc, char** argv)
     const string & exechost = parser.getValue("TEST_LOCAL_RSH_EXECUTION_HOST");
     const string & user = parser.getValue("TEST_LOCAL_RSH_USER");
     int timeout = parser.getValueAsInt("TEST_LOCAL_RSH_TIMEOUT");
-    int finalizationTime = parser.getValueAsInt("TEST_LOCAL_RSH_FINALIZATION_TIME");
 
     // Define the job...
     Job job;
@@ -116,10 +115,6 @@ int main(int argc, char** argv)
 
     cout << "Job " << jobid.__repr__() << " is done" << endl;
 
-    // wait for the copy of output files and the cleanup
-    // (there's no cleaner way to do that yet)
-    sleep(finalizationTime);
-
   } catch (GenericException e) {
     cerr << "Error: " << e << endl;
     return 1;
index f7ed8e4fa271eb3df7a9ab6c5b1eca161015d34e..bcac2bfa984e7072ad538c6f762d405f8a1c524f 100644 (file)
@@ -63,7 +63,6 @@ int main(int argc, char** argv)
     parser.parseTestConfigFile();
     const string & workdir = parser.getValue("TEST_LOCAL_SH_WORK_DIR");
     int timeout = parser.getValueAsInt("TEST_LOCAL_SH_TIMEOUT");
-    int finalizationTime = parser.getValueAsInt("TEST_LOCAL_SH_FINALIZATION_TIME");
 
     // Define the job...
     Job job;
@@ -116,10 +115,6 @@ int main(int argc, char** argv)
 
     cout << "Job " << jobid.__repr__() << " is done" << endl;
 
-    // wait for the copy of output files and the cleanup
-    // (there's no cleaner way to do that yet)
-    sleep(finalizationTime);
-
   } catch (GenericException e) {
     cerr << "Error: " << e << endl;
     return 1;
index d216954e94f7699f0406782a42c2c709c7ec0252..9db89170df071dd36c09069886d58475e17213d4 100644 (file)
@@ -65,7 +65,6 @@ int main(int argc, char** argv)
     const string & exechost = parser.getValue("TEST_LOCAL_SSH_EXECUTION_HOST");
     const string & user = parser.getValue("TEST_LOCAL_SSH_USER");
     int timeout = parser.getValueAsInt("TEST_LOCAL_SSH_TIMEOUT");
-    int finalizationTime = parser.getValueAsInt("TEST_LOCAL_SSH_FINALIZATION_TIME");
 
     // Define the job...
     Job job;
@@ -119,10 +118,6 @@ int main(int argc, char** argv)
 
     cout << "Job " << jobid.__repr__() << " is done" << endl;
 
-    // wait for the copy of output files and the cleanup
-    // (there's no cleaner way to do that yet)
-    sleep(finalizationTime);
-
   } catch (GenericException e) {
     cerr << "Error: " << e << endl;
     return 1;
index 4427bab27bc95587ac3d8cfda914454a17916490..11194e40763a8297ea73679ba4a585d8d550fc4f 100644 (file)
@@ -87,10 +87,6 @@ def work():
 
     print "Job", jobid, "is done"
 
-    # wait for the copy of output files and the cleanup
-    # (there's no cleaner way to do that yet)
-    time.sleep(config.TEST_LOCAL_SH_FINALIZATION_TIME)
-
     # test the result file
     exp = "c = 12"
     f = open('result.txt')