From 89a7a6c870f65a1a2f4c6820625d1364db9ba51f Mon Sep 17 00:00:00 2001 From: Ovidiu Mircescu Date: Thu, 26 Apr 2018 14:57:08 +0200 Subject: [PATCH] Try to make SALOME_Launcher thread safe. --- src/Launcher/Launcher.cxx | 73 +++++++++++++++++++++------ src/Launcher/Launcher.hxx | 2 +- src/Launcher/Launcher_Job_Command.cxx | 4 +- src/Launcher/Launcher_Job_SALOME.cxx | 4 +- 4 files changed, 63 insertions(+), 20 deletions(-) diff --git a/src/Launcher/Launcher.cxx b/src/Launcher/Launcher.cxx index 9fb381ff8..cee16a829 100644 --- a/src/Launcher/Launcher.cxx +++ b/src/Launcher/Launcher.cxx @@ -41,6 +41,39 @@ using namespace std; +class ReadLock +{ +public: + ReadLock(pthread_rwlock_t * lock) + : _lock(lock) + { +// pthread_rwlock_rdlock(_lock); + pthread_rwlock_wrlock(_lock); + } + ~ReadLock() + { + pthread_rwlock_unlock(_lock); + } +private: + pthread_rwlock_t * _lock; +}; + +class WriteLock +{ +public: + WriteLock(pthread_rwlock_t * lock) + : _lock(lock) + { + pthread_rwlock_wrlock(_lock); + } + ~WriteLock() + { + pthread_rwlock_unlock(_lock); + } +private: + pthread_rwlock_t * _lock; +}; + //============================================================================= /*! * Constructor @@ -53,8 +86,8 @@ Launcher_cpp::Launcher_cpp() { LAUNCHER_MESSAGE("Launcher_cpp constructor"); _job_cpt = 0; - _job_cpt_mutex = new pthread_mutex_t(); - pthread_mutex_init(_job_cpt_mutex, NULL); + _lock = new pthread_rwlock_t(); + pthread_rwlock_init(_lock, NULL); } //============================================================================= @@ -74,8 +107,8 @@ Launcher_cpp::~Launcher_cpp() delete it1->second; #endif - pthread_mutex_destroy(_job_cpt_mutex); - delete _job_cpt_mutex; + pthread_rwlock_destroy(_lock); + delete _lock; } #ifdef WITH_LIBBATCH @@ -88,12 +121,11 @@ Launcher_cpp::~Launcher_cpp() void Launcher_cpp::createJob(Launcher::Job * new_job) { + WriteLock mutex(_lock); LAUNCHER_MESSAGE("Creating a new job"); // Add job to the jobs map - pthread_mutex_lock(_job_cpt_mutex); new_job->setNumber(_job_cpt); _job_cpt++; - pthread_mutex_unlock(_job_cpt_mutex); std::map::const_iterator it_job = _launcher_job_map.find(new_job->getNumber()); if (it_job == _launcher_job_map.end()) { @@ -101,9 +133,9 @@ Launcher_cpp::createJob(Launcher::Job * new_job) } else { - LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber()); + LAUNCHER_INFOS("A job has already the same id: " << new_job->getNumber()); delete new_job; - throw LauncherException("A job as already the same id - job is not created !"); + throw LauncherException("A job has already the same id - job is not created !"); } LAUNCHER_MESSAGE("New Job created"); } @@ -116,6 +148,7 @@ Launcher_cpp::createJob(Launcher::Job * new_job) void Launcher_cpp::launchJob(int job_id) { + ReadLock mutex(_lock); LAUNCHER_MESSAGE("Launch a job"); // Check if job exist @@ -157,6 +190,7 @@ Launcher_cpp::launchJob(int job_id) const char * Launcher_cpp::getJobState(int job_id) { + ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get job state"); // Check if job exist @@ -184,6 +218,7 @@ Launcher_cpp::getJobState(int job_id) const char * Launcher_cpp::getAssignedHostnames(int job_id) { + ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get job assigned hostnames"); // Check if job exist @@ -201,6 +236,7 @@ Launcher_cpp::getAssignedHostnames(int job_id) void Launcher_cpp::getJobResults(int job_id, std::string directory) { + ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get Job results"); Launcher::Job * job = findJob(job_id); @@ -228,6 +264,7 @@ Launcher_cpp::getJobResults(int job_id, std::string directory) void Launcher_cpp::clearJobWorkingDir(int job_id) { + ReadLock mutex(_lock); LAUNCHER_MESSAGE("Clear the remote working directory"); Launcher::Job * job = findJob(job_id); @@ -252,6 +289,7 @@ bool Launcher_cpp::getJobDumpState(int job_id, std::string directory) { bool rtn; + ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get Job dump state"); Launcher::Job * job = findJob(job_id); @@ -283,6 +321,7 @@ Launcher_cpp::getJobWorkFile(int job_id, std::string directory) { bool rtn; + ReadLock mutex(_lock); LAUNCHER_MESSAGE("Get working file " << work_file); Launcher::Job * job = findJob(job_id); @@ -311,6 +350,7 @@ Launcher_cpp::getJobWorkFile(int job_id, void Launcher_cpp::removeJob(int job_id) { + WriteLock mutex(_lock); LAUNCHER_MESSAGE("Remove Job"); // Check if job exist @@ -334,6 +374,7 @@ Launcher_cpp::removeJob(int job_id) void Launcher_cpp::stopJob(int job_id) { + ReadLock mutex(_lock); LAUNCHER_MESSAGE("Stop Job"); Launcher::Job * job = findJob(job_id); @@ -343,6 +384,7 @@ Launcher_cpp::stopJob(int job_id) std::string Launcher_cpp::dumpJob(int job_id) { + ReadLock mutex(_lock); LAUNCHER_MESSAGE("dump Job"); Launcher::Job * job = findJob(job_id); @@ -765,12 +807,11 @@ Launcher_cpp::createBatchManagerForJob(Launcher::Job * job) void Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job) { + WriteLock mutex(_lock); // Step 0: Calculated job_id - pthread_mutex_lock(_job_cpt_mutex); int job_id = _job_cpt; _job_cpt++; new_job->setNumber(job_id); - pthread_mutex_unlock(_job_cpt_mutex); // Step 1: check if resource is already in the map createBatchManagerForJob(new_job); @@ -886,13 +927,15 @@ Launcher_cpp::saveJobs(const char* jobs_file) { // Create a sorted list from the internal job map list jobs_list; - for (int i=0; i<_job_cpt; i++) { - map::const_iterator it_job = _launcher_job_map.find(i); - if (it_job != _launcher_job_map.end()) - jobs_list.push_back(it_job->second); + ReadLock mutex(_lock); + for (int i=0; i<_job_cpt; i++) + { + map::const_iterator it_job = _launcher_job_map.find(i); + if (it_job != _launcher_job_map.end()) + jobs_list.push_back(it_job->second); + } } - // Save the jobs in XML file Launcher::XML_Persistence::saveJobs(jobs_file, jobs_list); } diff --git a/src/Launcher/Launcher.hxx b/src/Launcher/Launcher.hxx index 43030dddf..17880ff5c 100644 --- a/src/Launcher/Launcher.hxx +++ b/src/Launcher/Launcher.hxx @@ -111,7 +111,7 @@ protected: std::map _launcher_job_map; int _job_cpt; // job number counter - pthread_mutex_t * _job_cpt_mutex; // mutex for job counter + pthread_rwlock_t * _lock; }; #endif diff --git a/src/Launcher/Launcher_Job_Command.cxx b/src/Launcher/Launcher_Job_Command.cxx index e4f18f96e..b71d73735 100644 --- a/src/Launcher/Launcher_Job_Command.cxx +++ b/src/Launcher/Launcher_Job_Command.cxx @@ -79,7 +79,7 @@ Launcher::Job_Command::buildCommandScript(Batch::Parametre params, std::string l launch_script_stream << "#!/bin/sh -f" << std::endl; launch_script_stream << "cd " << work_directory << std::endl; // remove the exit code from any previous execution - launch_script_stream << "rm -f logs/end_code.log" << std::endl; + launch_script_stream << "rm -f logs/exit_code.log" << std::endl; launch_script_stream << "export PYTHONPATH=" << work_directory << ":$PYTHONPATH" << std::endl; launch_script_stream << "export PATH=" << work_directory << ":$PATH" << std::endl; if (_env_file != "") @@ -101,7 +101,7 @@ Launcher::Job_Command::buildCommandScript(Batch::Parametre params, std::string l launch_script_stream << runCommandString() << std::endl; #ifndef WIN32 // log the exit code - launch_script_stream << "echo $? > logs/end_code.log" << std::endl; + launch_script_stream << "echo $? > logs/exit_code.log" << std::endl; #endif // Return diff --git a/src/Launcher/Launcher_Job_SALOME.cxx b/src/Launcher/Launcher_Job_SALOME.cxx index fa3f4b732..17e7e1766 100644 --- a/src/Launcher/Launcher_Job_SALOME.cxx +++ b/src/Launcher/Launcher_Job_SALOME.cxx @@ -80,7 +80,7 @@ Launcher::Job_SALOME::buildSalomeScript(Batch::Parametre params) launch_script_stream << "#!/bin/sh -f" << std::endl; launch_script_stream << "cd " << work_directory << std::endl; // remove the exit code from any previous execution - launch_script_stream << "rm -f logs/end_code.log" << std::endl; + launch_script_stream << "rm -f logs/exit_code.log" << std::endl; launch_script_stream << "export PYTHONPATH=" << work_directory << ":$PYTHONPATH" << std::endl; launch_script_stream << "export PATH=" << work_directory << ":$PATH" << std::endl; @@ -142,7 +142,7 @@ Launcher::Job_SALOME::buildSalomeScript(Batch::Parametre params) // Call real job type addJobTypeSpecificScript(launch_script_stream); // log the exit code - launch_script_stream << "echo $? > logs/end_code.log" << std::endl; + launch_script_stream << "echo $? > logs/exit_code.log" << std::endl; // End launch_script_stream << _resource_definition.AppliPath << "/salome kill \"$appli_port\"" << std::endl; -- 2.39.2