]> SALOME platform Git repositories - modules/yacs.git/commitdiff
Salome HOME
Use the single thread policy for Launcher Server.
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Thu, 7 Jun 2018 09:15:18 +0000 (11:15 +0200)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Thu, 7 Jun 2018 09:15:18 +0000 (11:15 +0200)
We have to deal with the fact that the Launcher is not thread safe and we
have concurrency problems. A new test is added to illustrate the situation.

src/Launcher/Launcher.cxx
src/Launcher/Launcher.hxx
src/Launcher/SALOME_LauncherServer.cxx
src/Launcher/Test/CMakeLists.txt
src/Launcher/Test/CTestTestfileInstall.cmake
src/Launcher/Test/launcher_use_case.py [new file with mode: 0644]
src/Launcher/Test/test_stress.sh [new file with mode: 0755]

index cee16a829fda59eb4f1498ef75f9a5b658bf1b74..96a1fafd0a87ce5abcfd6a59f466fb543b182944 100644 (file)
 
 using namespace std;
 
-class ReadLock
-{
-public:
-  ReadLock(pthread_rwlock_t * lock)
-  : _lock(lock)
-  {
-//    pthread_rwlock_rdlock(_lock);
-    pthread_rwlock_wrlock(_lock);
-  }
-  ~ReadLock()
-  {
-    pthread_rwlock_unlock(_lock);
-  }
-private:
-  pthread_rwlock_t * _lock;
-};
-
-class WriteLock
-{
-public:
-  WriteLock(pthread_rwlock_t * lock)
-  : _lock(lock)
-  {
-    pthread_rwlock_wrlock(_lock);
-  }
-  ~WriteLock()
-  {
-    pthread_rwlock_unlock(_lock);
-  }
-private:
-  pthread_rwlock_t * _lock;
-};
-
 //=============================================================================
 /*! 
  *  Constructor
@@ -86,8 +53,6 @@ Launcher_cpp::Launcher_cpp()
 {
   LAUNCHER_MESSAGE("Launcher_cpp constructor");
   _job_cpt = 0;
-  _lock = new pthread_rwlock_t();
-  pthread_rwlock_init(_lock, NULL);
 }
 
 //=============================================================================
@@ -106,9 +71,6 @@ Launcher_cpp::~Launcher_cpp()
   for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
     delete it1->second;
 #endif
-
-  pthread_rwlock_destroy(_lock);
-  delete _lock;
 }
 
 #ifdef WITH_LIBBATCH
@@ -121,7 +83,6 @@ Launcher_cpp::~Launcher_cpp()
 void 
 Launcher_cpp::createJob(Launcher::Job * new_job)
 {
-  WriteLock mutex(_lock);
   LAUNCHER_MESSAGE("Creating a new job");
   // Add job to the jobs map
   new_job->setNumber(_job_cpt);
@@ -148,10 +109,9 @@ Launcher_cpp::createJob(Launcher::Job * new_job)
 void 
 Launcher_cpp::launchJob(int job_id)
 {
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("Launch a job");
 
-  // Check if job exist
+  // Check if job exists
   Launcher::Job * job = findJob(job_id);
 
   // Check job state (cannot launch a job already launched...)
@@ -161,15 +121,10 @@ Launcher_cpp::launchJob(int job_id)
     throw LauncherException("Bad state of the job: " + job->getState());
   }
 
-  // Third step search batch manager for the job into the map -> instantiate one if does not exist
-  std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
-  if(it == _batchmap.end())
-  {
-    createBatchManagerForJob(job);
-  }
+  Batch::BatchManager * bm = getBatchManager(job);
 
   try {
-    Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob()));
+    Batch::JobId batch_manager_job_id = bm->submitJob(*(job->getBatchJob()));
     job->setBatchManagerJobId(batch_manager_job_id);
     job->setState("QUEUED");
     job->setReference(batch_manager_job_id.getReference());
@@ -190,7 +145,6 @@ Launcher_cpp::launchJob(int job_id)
 const char *
 Launcher_cpp::getJobState(int job_id)
 {
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("Get job state");
 
   // Check if job exist
@@ -218,7 +172,6 @@ Launcher_cpp::getJobState(int job_id)
 const char *
 Launcher_cpp::getAssignedHostnames(int job_id)
 {
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("Get job assigned hostnames");
 
   // Check if job exist
@@ -236,7 +189,6 @@ Launcher_cpp::getAssignedHostnames(int job_id)
 void
 Launcher_cpp::getJobResults(int job_id, std::string directory)
 {
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("Get Job results");
 
   Launcher::Job * job = findJob(job_id);
@@ -264,7 +216,6 @@ Launcher_cpp::getJobResults(int job_id, std::string directory)
 void
 Launcher_cpp::clearJobWorkingDir(int job_id)
 {
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("Clear the remote working directory");
 
   Launcher::Job * job = findJob(job_id);
@@ -289,7 +240,6 @@ bool
 Launcher_cpp::getJobDumpState(int job_id, std::string directory)
 {
   bool rtn;
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("Get Job dump state");
 
   Launcher::Job * job = findJob(job_id);
@@ -321,7 +271,6 @@ Launcher_cpp::getJobWorkFile(int job_id,
                              std::string directory)
 {
   bool rtn;
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("Get working file " << work_file);
 
   Launcher::Job * job = findJob(job_id);
@@ -350,7 +299,6 @@ Launcher_cpp::getJobWorkFile(int job_id,
 void
 Launcher_cpp::removeJob(int job_id)
 {
-  WriteLock mutex(_lock);
   LAUNCHER_MESSAGE("Remove Job");
 
   // Check if job exist
@@ -374,7 +322,6 @@ Launcher_cpp::removeJob(int job_id)
 void
 Launcher_cpp::stopJob(int job_id)
 {
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("Stop Job");
 
   Launcher::Job * job = findJob(job_id);
@@ -384,7 +331,6 @@ Launcher_cpp::stopJob(int job_id)
 std::string
 Launcher_cpp::dumpJob(int job_id)
 {
-  ReadLock mutex(_lock);
   LAUNCHER_MESSAGE("dump Job");
 
   Launcher::Job * job = findJob(job_id);
@@ -399,7 +345,9 @@ Launcher_cpp::restoreJob(const std::string& dumpedJob)
   int jobId = -1;
   try
   {
-    new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob);
+    {
+      new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob);
+    }
     if(new_job)
     {
       jobId = addJob(new_job);
@@ -736,9 +684,11 @@ Launcher_cpp::getJobs()
   return _launcher_job_map;
 }
 
-void 
-Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
+#ifdef WITH_LIBBATCH
+Batch::BatchManager*
+Launcher_cpp::getBatchManager(Launcher::Job * job)
 {
+  Batch::BatchManager* result = nullptr;
   int job_id = job->getNumber();
 
   // Select a resource for the job
@@ -778,7 +728,6 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
   }
 
   // Step 2: We can now add a Factory if the resource is correctly define
-#ifdef WITH_LIBBATCH
   std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
   if(it == _batchmap.end())
   {
@@ -787,8 +736,8 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
       // Warning cannot write on one line like this, because map object is constructed before
       // the method is called...
       //_batchmap[job_id] = FactoryBatchManager(resource_definition);
-      Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition);
-      _batchmap[job_id] = batch_client;
+      result = FactoryBatchManager(resource_definition);
+      _batchmap[job_id] = result;
     }
     catch(const LauncherException &ex)
     {
@@ -801,27 +750,28 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
       throw LauncherException(ex.message);
     }
   }
-#endif
+  else
+    result = it->second;
+  return result;
 }
+#endif
 
 void 
 Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job)
 {
-  WriteLock mutex(_lock);
   // Step 0: Calculated job_id
-  int job_id = _job_cpt;
+  new_job->setNumber(_job_cpt);
   _job_cpt++;
-  new_job->setNumber(job_id);
 
+#ifdef WITH_LIBBATCH
   // Step 1: check if resource is already in the map
-  createBatchManagerForJob(new_job);
+  Batch::BatchManager * bm = getBatchManager(new_job);
 
   // Step 2: add the job to the batch manager
-#ifdef WITH_LIBBATCH
   try
   {
-    Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()),
-                                                                  new_job->getReference());
+    Batch::JobId batch_manager_job_id = bm->addJob(*(new_job->getBatchJob()),
+                                                   new_job->getReference());
     new_job->setBatchManagerJobId(batch_manager_job_id);
   }
   catch(const Batch::GenericException &ex)
@@ -928,7 +878,6 @@ Launcher_cpp::saveJobs(const char* jobs_file)
   // Create a sorted list from the internal job map
   list<const Launcher::Job *> jobs_list;
   {
-    ReadLock mutex(_lock);
     for (int i=0; i<_job_cpt; i++)
     {
       map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
index 17880ff5cde219785b62a50606413baaf508e7c8..a8fbef7a8b37d4723bf0e19370dc36898b24a6ca 100644 (file)
@@ -35,8 +35,6 @@
 #include <vector>
 #include <list>
 
-#include <pthread.h>
-
 class MpiImpl;
 
 namespace Batch{
@@ -87,7 +85,6 @@ public:
   // Useful methods
   long createJobWithFile(std::string xmlExecuteFile, std::string clusterName);
   std::map<int, Launcher::Job *> getJobs();
-  void createBatchManagerForJob(Launcher::Job * job);
   void addJobDirectlyToMap(Launcher::Job * new_job);
 
   // Lib methods
@@ -106,12 +103,12 @@ protected:
 #ifdef WITH_LIBBATCH
   Batch::BatchManager *FactoryBatchManager(ParserResourcesType& params);
   std::map <int, Batch::BatchManager*> _batchmap;
+  Batch::BatchManager* getBatchManager(Launcher::Job * job);
 #endif
   ParserLauncherType ParseXmlFile(std::string xmlExecuteFile);
 
   std::map <int, Launcher::Job *> _launcher_job_map;  
   int _job_cpt; // job number counter
-  pthread_rwlock_t * _lock;
 };
 
 #endif
index e7da460f6bbecc86e6a3eaf99eb254fac26569a7..a69958cab827957e945861b7bc453aa084e4ee36 100644 (file)
@@ -107,7 +107,14 @@ int main(int argc, char* argv[])
   }
   try
     {
-      SALOME_Launcher *lServ(new SALOME_Launcher(orb,root_poa));
+      CORBA::PolicyList policies;
+      policies.length(1);
+      PortableServer::ThreadPolicy_var threadPol(root_poa->create_thread_policy(PortableServer::SINGLE_THREAD_MODEL));
+      policies[0] = PortableServer::ThreadPolicy::_duplicate(threadPol);
+      PortableServer::POA_var safePOA = root_poa->create_POA("SingleThreadPOA",pman,policies);
+      threadPol->destroy();
+
+      SALOME_Launcher *lServ(new SALOME_Launcher(orb,safePOA));
       lServ->_remove_ref();
       //
       SALOMESDS::DataServerManager *dsm(new SALOMESDS::DataServerManager(argc,argv,orb,root_poa));
index 48378f35b4177eba7da8bfadd5215f29cc09e5f6..68037d26b0b17173ed5961f64714d77b1ee22ecf 100644 (file)
@@ -33,4 +33,10 @@ IF(NOT WIN32)
   INSTALL(FILES CTestTestfileInstall.cmake
           DESTINATION ${KERNEL_TEST_DIR}/Launcher
           RENAME CTestTestfile.cmake)
+  SET(STRESS_TEST_FILES
+      test_stress.sh
+      launcher_use_case.py
+      )
+  INSTALL(PROGRAMS ${STRESS_TEST_FILES}
+          DESTINATION ${KERNEL_TEST_DIR}/Launcher)
 ENDIF()
index 3d4df3a2d0251415da7372351d6552d0fe1342de..9c7e2763954d04c6ef7965f46c232ac0d5a387e2 100644 (file)
 IF(NOT WIN32)
   SET(TEST_NAME ${COMPONENT_NAME}_Launcher)
   ADD_TEST(${TEST_NAME} python ${SALOME_TEST_DRIVER} 2000 test_launcher.py)
+  SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES LABELS "${COMPONENT_NAME}"
+    #                                                 TIMEOUT 500
+    )
+
+  SET(TEST_NAME ${COMPONENT_NAME}_StressLauncher)
+  ADD_TEST(${TEST_NAME} python ${SALOME_TEST_DRIVER} 2000 ./test_stress.sh)
   SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES LABELS "${COMPONENT_NAME}"
     #                                                 TIMEOUT 500
     )
diff --git a/src/Launcher/Test/launcher_use_case.py b/src/Launcher/Test/launcher_use_case.py
new file mode 100644 (file)
index 0000000..eac14d7
--- /dev/null
@@ -0,0 +1,44 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# classic use case of a job
+import os
+import salome
+import tempfile
+import time
+import sys
+
+if __name__ == '__main__':
+  salome.salome_init()
+  launcher = salome.naming_service.Resolve('/SalomeLauncher')
+  job_params = salome.JobParameters()
+  job_params.resource_required = salome.ResourceParameters()
+  job_params.resource_required.name = "localhost"
+  job_params.resource_required.nb_proc = 1 # slurm: --ntasks
+
+  job_params.job_type = "command"
+  #cwd = os.getcwd()
+  cwd = sys.argv[1]
+  case_dir = tempfile.mkdtemp(prefix="test")
+  job_params.local_directory = cwd
+  job_params.job_file = "command.sh"
+  job_params.work_directory = os.path.join(case_dir, "run")
+  job_params.result_directory = os.path.join(case_dir, "result")
+  job_params.out_files = ["result.txt"]
+  job_params.wckey="P11U5:CARBONES"
+  job_params.job_name = "MyJob"
+  job_id = launcher.createJob(job_params)
+  launcher.launchJob(job_id)
+  job_string = launcher.dumpJob(job_id)
+  jobState = launcher.getJobState(job_id)
+  while jobState != 'FINISHED' and jobState != 'FAILED':
+      time.sleep(1)
+      jobState = launcher.getJobState(job_id)
+  launcher.getJobResults(job_id, '')
+  launcher.getJobWorkFile(job_id, "result.txt", '')
+  launcher.clearJobWorkingDir(job_id)
+  launcher.removeJob(job_id)
+  new_id = launcher.restoreJob(job_string)
+  job_params_bis = launcher.getJobParameters(new_id)
+  jobState = launcher.getJobState(new_id)
+  launcher.removeJob(new_id)
+  launcher.saveJobs(os.path.join(case_dir,"savejobs.xml"))
diff --git a/src/Launcher/Test/test_stress.sh b/src/Launcher/Test/test_stress.sh
new file mode 100755 (executable)
index 0000000..4ed5a05
--- /dev/null
@@ -0,0 +1,24 @@
+#! /bin/bash
+# This test launches in parallel a greate number of instancies of a usual use case
+WORKDIR=`mktemp -d`
+echo WORKDIR: $WORKDIR
+cat > $WORKDIR/command.sh <<< 'echo "OK" > result.txt'
+chmod 755 $WORKDIR/command.sh
+for i in {1..500}
+do
+  python launcher_use_case.py $WORKDIR 2> $WORKDIR/log$i.err &
+done
+exit_code=0
+for i in {1..500}
+do
+  wait -n
+  ret=$?
+  if [ $ret -ne "0" ]
+  then
+     echo "Error detected!"
+     exit_code=1
+  fi
+done
+# list of error files not empty
+ls -l $WORKDIR/*.err | awk '{if ($5 != "0") print $0}'
+exit $exit_code
\ No newline at end of file