From 2f998aca9d429a7d0c90c07271a8a204b5073be8 Mon Sep 17 00:00:00 2001 From: Ovidiu Mircescu Date: Thu, 7 Jun 2018 11:15:18 +0200 Subject: [PATCH] Use the single thread policy for Launcher Server. We have to deal with the fact that the Launcher is not thread safe and we have concurrency problems. A new test is added to illustrate the situation. --- src/Launcher/Launcher.cxx | 93 +++++--------------- src/Launcher/Launcher.hxx | 5 +- src/Launcher/SALOME_LauncherServer.cxx | 9 +- src/Launcher/Test/CMakeLists.txt | 6 ++ src/Launcher/Test/CTestTestfileInstall.cmake | 6 ++ src/Launcher/Test/launcher_use_case.py | 44 +++++++++ src/Launcher/Test/test_stress.sh | 24 +++++ 7 files changed, 110 insertions(+), 77 deletions(-) create mode 100644 src/Launcher/Test/launcher_use_case.py create mode 100755 src/Launcher/Test/test_stress.sh diff --git a/src/Launcher/Launcher.cxx b/src/Launcher/Launcher.cxx index cee16a829..96a1fafd0 100644 --- a/src/Launcher/Launcher.cxx +++ b/src/Launcher/Launcher.cxx @@ -41,39 +41,6 @@ using namespace std; -class ReadLock -{ -public: - ReadLock(pthread_rwlock_t * lock) - : _lock(lock) - { -// pthread_rwlock_rdlock(_lock); - pthread_rwlock_wrlock(_lock); - } - ~ReadLock() - { - pthread_rwlock_unlock(_lock); - } -private: - pthread_rwlock_t * _lock; -}; - -class WriteLock -{ -public: - WriteLock(pthread_rwlock_t * lock) - : _lock(lock) - { - pthread_rwlock_wrlock(_lock); - } - ~WriteLock() - { - pthread_rwlock_unlock(_lock); - } -private: - pthread_rwlock_t * _lock; -}; - //============================================================================= /*! * Constructor @@ -86,8 +53,6 @@ Launcher_cpp::Launcher_cpp() { LAUNCHER_MESSAGE("Launcher_cpp constructor"); _job_cpt = 0; - _lock = new pthread_rwlock_t(); - pthread_rwlock_init(_lock, NULL); } //============================================================================= @@ -106,9 +71,6 @@ Launcher_cpp::~Launcher_cpp() for(it1=_batchmap.begin();it1!=_batchmap.end();it1++) delete it1->second; #endif - - pthread_rwlock_destroy(_lock); - delete _lock; } #ifdef WITH_LIBBATCH @@ -121,7 +83,6 @@ Launcher_cpp::~Launcher_cpp() void Launcher_cpp::createJob(Launcher::Job * new_job) { - WriteLock mutex(_lock); LAUNCHER_MESSAGE("Creating a new job"); // Add job to the jobs map new_job->setNumber(_job_cpt); @@ -148,10 +109,9 @@ Launcher_cpp::createJob(Launcher::Job * new_job) void Launcher_cpp::launchJob(int job_id) { - ReadLock mutex(_lock); LAUNCHER_MESSAGE("Launch a job"); - // Check if job exist + // Check if job exists Launcher::Job * job = findJob(job_id); // Check job state (cannot launch a job already launched...) @@ -161,15 +121,10 @@ Launcher_cpp::launchJob(int job_id) throw LauncherException("Bad state of the job: " + job->getState()); } - // Third step search batch manager for the job into the map -> instantiate one if does not exist - std::map::const_iterator it = _batchmap.find(job_id); - if(it == _batchmap.end()) - { - createBatchManagerForJob(job); - } + Batch::BatchManager * bm = getBatchManager(job); try { - Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob())); + Batch::JobId batch_manager_job_id = bm->submitJob(*(job->getBatchJob())); job->setBatchManagerJobId(batch_manager_job_id); job->setState("QUEUED"); job->setReference(batch_manager_job_id.getReference()); @@ -190,7 +145,6 @@ Launcher_cpp::launchJob(int job_id) const char * Launcher_cpp::getJobState(int job_id) { - ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get job state"); // Check if job exist @@ -218,7 +172,6 @@ Launcher_cpp::getJobState(int job_id) const char * Launcher_cpp::getAssignedHostnames(int job_id) { - ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get job assigned hostnames"); // Check if job exist @@ -236,7 +189,6 @@ Launcher_cpp::getAssignedHostnames(int job_id) void Launcher_cpp::getJobResults(int job_id, std::string directory) { - ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get Job results"); Launcher::Job * job = findJob(job_id); @@ -264,7 +216,6 @@ Launcher_cpp::getJobResults(int job_id, std::string directory) void Launcher_cpp::clearJobWorkingDir(int job_id) { - ReadLock mutex(_lock); LAUNCHER_MESSAGE("Clear the remote working directory"); Launcher::Job * job = findJob(job_id); @@ -289,7 +240,6 @@ bool Launcher_cpp::getJobDumpState(int job_id, std::string directory) { bool rtn; - ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get Job dump state"); Launcher::Job * job = findJob(job_id); @@ -321,7 +271,6 @@ Launcher_cpp::getJobWorkFile(int job_id, std::string directory) { bool rtn; - ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get working file " << work_file); Launcher::Job * job = findJob(job_id); @@ -350,7 +299,6 @@ Launcher_cpp::getJobWorkFile(int job_id, void Launcher_cpp::removeJob(int job_id) { - WriteLock mutex(_lock); LAUNCHER_MESSAGE("Remove Job"); // Check if job exist @@ -374,7 +322,6 @@ Launcher_cpp::removeJob(int job_id) void Launcher_cpp::stopJob(int job_id) { - ReadLock mutex(_lock); LAUNCHER_MESSAGE("Stop Job"); Launcher::Job * job = findJob(job_id); @@ -384,7 +331,6 @@ Launcher_cpp::stopJob(int job_id) std::string Launcher_cpp::dumpJob(int job_id) { - ReadLock mutex(_lock); LAUNCHER_MESSAGE("dump Job"); Launcher::Job * job = findJob(job_id); @@ -399,7 +345,9 @@ Launcher_cpp::restoreJob(const std::string& dumpedJob) int jobId = -1; try { - new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob); + { + new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob); + } if(new_job) { jobId = addJob(new_job); @@ -736,9 +684,11 @@ Launcher_cpp::getJobs() return _launcher_job_map; } -void -Launcher_cpp::createBatchManagerForJob(Launcher::Job * job) +#ifdef WITH_LIBBATCH +Batch::BatchManager* +Launcher_cpp::getBatchManager(Launcher::Job * job) { + Batch::BatchManager* result = nullptr; int job_id = job->getNumber(); // Select a resource for the job @@ -778,7 +728,6 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job) } // Step 2: We can now add a Factory if the resource is correctly define -#ifdef WITH_LIBBATCH std::map::const_iterator it = _batchmap.find(job_id); if(it == _batchmap.end()) { @@ -787,8 +736,8 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job) // Warning cannot write on one line like this, because map object is constructed before // the method is called... //_batchmap[job_id] = FactoryBatchManager(resource_definition); - Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition); - _batchmap[job_id] = batch_client; + result = FactoryBatchManager(resource_definition); + _batchmap[job_id] = result; } catch(const LauncherException &ex) { @@ -801,27 +750,28 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job) throw LauncherException(ex.message); } } -#endif + else + result = it->second; + return result; } +#endif void Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job) { - WriteLock mutex(_lock); // Step 0: Calculated job_id - int job_id = _job_cpt; + new_job->setNumber(_job_cpt); _job_cpt++; - new_job->setNumber(job_id); +#ifdef WITH_LIBBATCH // Step 1: check if resource is already in the map - createBatchManagerForJob(new_job); + Batch::BatchManager * bm = getBatchManager(new_job); // Step 2: add the job to the batch manager -#ifdef WITH_LIBBATCH try { - Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()), - new_job->getReference()); + Batch::JobId batch_manager_job_id = bm->addJob(*(new_job->getBatchJob()), + new_job->getReference()); new_job->setBatchManagerJobId(batch_manager_job_id); } catch(const Batch::GenericException &ex) @@ -928,7 +878,6 @@ Launcher_cpp::saveJobs(const char* jobs_file) // Create a sorted list from the internal job map list jobs_list; { - ReadLock mutex(_lock); for (int i=0; i<_job_cpt; i++) { map::const_iterator it_job = _launcher_job_map.find(i); diff --git a/src/Launcher/Launcher.hxx b/src/Launcher/Launcher.hxx index 17880ff5c..a8fbef7a8 100644 --- a/src/Launcher/Launcher.hxx +++ b/src/Launcher/Launcher.hxx @@ -35,8 +35,6 @@ #include #include -#include - class MpiImpl; namespace Batch{ @@ -87,7 +85,6 @@ public: // Useful methods long createJobWithFile(std::string xmlExecuteFile, std::string clusterName); std::map getJobs(); - void createBatchManagerForJob(Launcher::Job * job); void addJobDirectlyToMap(Launcher::Job * new_job); // Lib methods @@ -106,12 +103,12 @@ protected: #ifdef WITH_LIBBATCH Batch::BatchManager *FactoryBatchManager(ParserResourcesType& params); std::map _batchmap; + Batch::BatchManager* getBatchManager(Launcher::Job * job); #endif ParserLauncherType ParseXmlFile(std::string xmlExecuteFile); std::map _launcher_job_map; int _job_cpt; // job number counter - pthread_rwlock_t * _lock; }; #endif diff --git a/src/Launcher/SALOME_LauncherServer.cxx b/src/Launcher/SALOME_LauncherServer.cxx index e7da460f6..a69958cab 100644 --- a/src/Launcher/SALOME_LauncherServer.cxx +++ b/src/Launcher/SALOME_LauncherServer.cxx @@ -107,7 +107,14 @@ int main(int argc, char* argv[]) } try { - SALOME_Launcher *lServ(new SALOME_Launcher(orb,root_poa)); + CORBA::PolicyList policies; + policies.length(1); + PortableServer::ThreadPolicy_var threadPol(root_poa->create_thread_policy(PortableServer::SINGLE_THREAD_MODEL)); + policies[0] = PortableServer::ThreadPolicy::_duplicate(threadPol); + PortableServer::POA_var safePOA = root_poa->create_POA("SingleThreadPOA",pman,policies); + threadPol->destroy(); + + SALOME_Launcher *lServ(new SALOME_Launcher(orb,safePOA)); lServ->_remove_ref(); // SALOMESDS::DataServerManager *dsm(new SALOMESDS::DataServerManager(argc,argv,orb,root_poa)); diff --git a/src/Launcher/Test/CMakeLists.txt b/src/Launcher/Test/CMakeLists.txt index 48378f35b..68037d26b 100644 --- a/src/Launcher/Test/CMakeLists.txt +++ b/src/Launcher/Test/CMakeLists.txt @@ -33,4 +33,10 @@ IF(NOT WIN32) INSTALL(FILES CTestTestfileInstall.cmake DESTINATION ${KERNEL_TEST_DIR}/Launcher RENAME CTestTestfile.cmake) + SET(STRESS_TEST_FILES + test_stress.sh + launcher_use_case.py + ) + INSTALL(PROGRAMS ${STRESS_TEST_FILES} + DESTINATION ${KERNEL_TEST_DIR}/Launcher) ENDIF() diff --git a/src/Launcher/Test/CTestTestfileInstall.cmake b/src/Launcher/Test/CTestTestfileInstall.cmake index 3d4df3a2d..9c7e27639 100644 --- a/src/Launcher/Test/CTestTestfileInstall.cmake +++ b/src/Launcher/Test/CTestTestfileInstall.cmake @@ -20,6 +20,12 @@ IF(NOT WIN32) SET(TEST_NAME ${COMPONENT_NAME}_Launcher) ADD_TEST(${TEST_NAME} python ${SALOME_TEST_DRIVER} 2000 test_launcher.py) + SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES LABELS "${COMPONENT_NAME}" + # TIMEOUT 500 + ) + + SET(TEST_NAME ${COMPONENT_NAME}_StressLauncher) + ADD_TEST(${TEST_NAME} python ${SALOME_TEST_DRIVER} 2000 ./test_stress.sh) SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES LABELS "${COMPONENT_NAME}" # TIMEOUT 500 ) diff --git a/src/Launcher/Test/launcher_use_case.py b/src/Launcher/Test/launcher_use_case.py new file mode 100644 index 000000000..eac14d71a --- /dev/null +++ b/src/Launcher/Test/launcher_use_case.py @@ -0,0 +1,44 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# classic use case of a job +import os +import salome +import tempfile +import time +import sys + +if __name__ == '__main__': + salome.salome_init() + launcher = salome.naming_service.Resolve('/SalomeLauncher') + job_params = salome.JobParameters() + job_params.resource_required = salome.ResourceParameters() + job_params.resource_required.name = "localhost" + job_params.resource_required.nb_proc = 1 # slurm: --ntasks + + job_params.job_type = "command" + #cwd = os.getcwd() + cwd = sys.argv[1] + case_dir = tempfile.mkdtemp(prefix="test") + job_params.local_directory = cwd + job_params.job_file = "command.sh" + job_params.work_directory = os.path.join(case_dir, "run") + job_params.result_directory = os.path.join(case_dir, "result") + job_params.out_files = ["result.txt"] + job_params.wckey="P11U5:CARBONES" + job_params.job_name = "MyJob" + job_id = launcher.createJob(job_params) + launcher.launchJob(job_id) + job_string = launcher.dumpJob(job_id) + jobState = launcher.getJobState(job_id) + while jobState != 'FINISHED' and jobState != 'FAILED': + time.sleep(1) + jobState = launcher.getJobState(job_id) + launcher.getJobResults(job_id, '') + launcher.getJobWorkFile(job_id, "result.txt", '') + launcher.clearJobWorkingDir(job_id) + launcher.removeJob(job_id) + new_id = launcher.restoreJob(job_string) + job_params_bis = launcher.getJobParameters(new_id) + jobState = launcher.getJobState(new_id) + launcher.removeJob(new_id) + launcher.saveJobs(os.path.join(case_dir,"savejobs.xml")) diff --git a/src/Launcher/Test/test_stress.sh b/src/Launcher/Test/test_stress.sh new file mode 100755 index 000000000..4ed5a050c --- /dev/null +++ b/src/Launcher/Test/test_stress.sh @@ -0,0 +1,24 @@ +#! /bin/bash +# This test launches in parallel a greate number of instancies of a usual use case +WORKDIR=`mktemp -d` +echo WORKDIR: $WORKDIR +cat > $WORKDIR/command.sh <<< 'echo "OK" > result.txt' +chmod 755 $WORKDIR/command.sh +for i in {1..500} +do + python launcher_use_case.py $WORKDIR 2> $WORKDIR/log$i.err & +done +exit_code=0 +for i in {1..500} +do + wait -n + ret=$? + if [ $ret -ne "0" ] + then + echo "Error detected!" + exit_code=1 + fi +done +# list of error files not empty +ls -l $WORKDIR/*.err | awk '{if ($5 != "0") print $0}' +exit $exit_code \ No newline at end of file -- 2.39.2