From b4f7f7b5ca2813062b2328697572c09546eeb40f Mon Sep 17 00:00:00 2001 From: Ovidiu Mircescu Date: Mon, 23 Apr 2018 14:15:55 +0200 Subject: [PATCH] Job serialization and restoration. --- bin/runSession.py | 4 +- idl/SALOME_Launcher.idl | 9 +- src/Launcher/Launcher.cxx | 230 ++++++++++++---------- src/Launcher/Launcher.hxx | 5 + src/Launcher/Launcher_Job_Command.cxx | 6 + src/Launcher/Launcher_Job_SALOME.cxx | 5 + src/Launcher/Launcher_XML_Persistence.cxx | 68 +++++++ src/Launcher/Launcher_XML_Persistence.hxx | 3 + src/Launcher/SALOME_Launcher.cxx | 37 ++++ src/Launcher/SALOME_Launcher.hxx | 2 + src/ResourcesManager/ResourcesManager.cxx | 2 +- 11 files changed, 262 insertions(+), 109 deletions(-) diff --git a/bin/runSession.py b/bin/runSession.py index 8045a62e6..bf0c5c6b7 100644 --- a/bin/runSession.py +++ b/bin/runSession.py @@ -193,6 +193,7 @@ def __runLocalSession(command): for cmd in command: single_cmd = cmd.strip().split(' ') any_error = False + error_code = 1 try: proc = subprocess.Popen(single_cmd) (stdoutdata, stderrdata) = proc.communicate() # Wait for process to terminate @@ -203,6 +204,7 @@ def __runLocalSession(command): if proc.returncode != 0: any_error = True + error_code = proc.returncode except: any_error = True pass @@ -213,7 +215,7 @@ def __runLocalSession(command): sys.stdout.write("".join(outmsg)) if errmsg: sys.stderr.write("".join(errmsg)) - sys.exit(1) + sys.exit(error_code) return 0 else: diff --git a/idl/SALOME_Launcher.idl b/idl/SALOME_Launcher.idl index 954b94e1f..4da1c6c5b 100644 --- a/idl/SALOME_Launcher.idl +++ b/idl/SALOME_Launcher.idl @@ -285,6 +285,14 @@ interface SalomeLauncher //! Kill the job and remove it from the jobs list void removeJob (in long job_id) raises (SALOME::SALOME_Exception); + //! Get the job's serialization string + string dumpJob(in long job_id) raises (SALOME::SALOME_Exception); + //! Create a job from its serialization string + /*! \param dumpedJob Serialization string returned by dumpJob. + \return Job id + */ + long restoreJob(in string dumpedJob) raises (SALOME::SALOME_Exception); + // Useful methods long createJobWithFile(in string xmlJobFile, in string clusterName) raises (SALOME::SALOME_Exception); boolean testBatch (in ResourceParameters params) raises (SALOME::SALOME_Exception); @@ -307,7 +315,6 @@ interface SalomeLauncher void loadJobs(in string jobs_file) raises (SALOME::SALOME_Exception); //! Save the current list of jobs in an xml file. void saveJobs(in string jobs_file) raises (SALOME::SALOME_Exception); - }; }; diff --git a/src/Launcher/Launcher.cxx b/src/Launcher/Launcher.cxx index 5db6b8df3..9fb381ff8 100644 --- a/src/Launcher/Launcher.cxx +++ b/src/Launcher/Launcher.cxx @@ -96,7 +96,9 @@ Launcher_cpp::createJob(Launcher::Job * new_job) pthread_mutex_unlock(_job_cpt_mutex); std::map::const_iterator it_job = _launcher_job_map.find(new_job->getNumber()); if (it_job == _launcher_job_map.end()) + { _launcher_job_map[new_job->getNumber()] = new_job; + } else { LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber()); @@ -117,14 +119,7 @@ Launcher_cpp::launchJob(int job_id) LAUNCHER_MESSAGE("Launch a job"); // Check if job exist - std::map::const_iterator it_job = _launcher_job_map.find(job_id); - if (it_job == _launcher_job_map.end()) - { - LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); - throw LauncherException("Cannot find the job, is it created ?"); - } - - Launcher::Job * job = it_job->second; + Launcher::Job * job = findJob(job_id); // Check job state (cannot launch a job already launched...) if (job->getState() != "CREATED") @@ -134,13 +129,11 @@ Launcher_cpp::launchJob(int job_id) } // Third step search batch manager for the job into the map -> instantiate one if does not exist -#ifdef WITH_LIBBATCH std::map::const_iterator it = _batchmap.find(job_id); if(it == _batchmap.end()) { createBatchManagerForJob(job); } -#endif try { Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob())); @@ -167,14 +160,7 @@ Launcher_cpp::getJobState(int job_id) LAUNCHER_MESSAGE("Get job state"); // Check if job exist - std::map::const_iterator it_job = _launcher_job_map.find(job_id); - if (it_job == _launcher_job_map.end()) - { - LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); - throw LauncherException("Cannot find the job, is it created ?"); - } - - Launcher::Job * job = it_job->second; + Launcher::Job * job = findJob(job_id); std::string state; try @@ -201,14 +187,7 @@ Launcher_cpp::getAssignedHostnames(int job_id) LAUNCHER_MESSAGE("Get job assigned hostnames"); // Check if job exist - std::map::const_iterator it_job = _launcher_job_map.find(job_id); - if (it_job == _launcher_job_map.end()) - { - LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); - throw LauncherException("Cannot find the job, is it created ?"); - } - - Launcher::Job * job = it_job->second; + Launcher::Job * job = findJob(job_id); std::string assigned_hostnames = job->getAssignedHostnames(); return assigned_hostnames.c_str(); @@ -224,15 +203,7 @@ Launcher_cpp::getJobResults(int job_id, std::string directory) { LAUNCHER_MESSAGE("Get Job results"); - // Check if job exist - std::map::const_iterator it_job = _launcher_job_map.find(job_id); - if (it_job == _launcher_job_map.end()) - { - LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); - throw LauncherException("Cannot find the job, is it created ?"); - } - - Launcher::Job * job = it_job->second; + Launcher::Job * job = findJob(job_id); std::string resource_name = job->getResourceDefinition().Name; try { @@ -259,15 +230,7 @@ Launcher_cpp::clearJobWorkingDir(int job_id) { LAUNCHER_MESSAGE("Clear the remote working directory"); - // Check if job exist - std::map::const_iterator it_job = _launcher_job_map.find(job_id); - if (it_job == _launcher_job_map.end()) - { - LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); - throw LauncherException("Cannot find the job, is it created ?"); - } - - Launcher::Job * job = it_job->second; + Launcher::Job * job = findJob(job_id); try { _batchmap[job_id]->clearWorkingDir(*(job->getBatchJob())); @@ -291,15 +254,7 @@ Launcher_cpp::getJobDumpState(int job_id, std::string directory) bool rtn; LAUNCHER_MESSAGE("Get Job dump state"); - // Check if job exist - std::map::const_iterator it_job = _launcher_job_map.find(job_id); - if (it_job == _launcher_job_map.end()) - { - LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); - throw LauncherException("Cannot find the job, is it created ?"); - } - - Launcher::Job * job = it_job->second; + Launcher::Job * job = findJob(job_id); std::string resource_name = job->getResourceDefinition().Name; try { @@ -330,15 +285,7 @@ Launcher_cpp::getJobWorkFile(int job_id, bool rtn; LAUNCHER_MESSAGE("Get working file " << work_file); - // Check if job exist - std::map::const_iterator it_job = _launcher_job_map.find(job_id); - if (it_job == _launcher_job_map.end()) - { - LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); - throw LauncherException("Cannot find the job, is it created ?"); - } - - Launcher::Job * job = it_job->second; + Launcher::Job * job = findJob(job_id); std::string resource_name = job->getResourceDefinition().Name; try { @@ -389,15 +336,42 @@ Launcher_cpp::stopJob(int job_id) { LAUNCHER_MESSAGE("Stop Job"); - // Check if job exist - std::map::iterator it_job = _launcher_job_map.find(job_id); - if (it_job == _launcher_job_map.end()) + Launcher::Job * job = findJob(job_id); + job->stopJob(); +} + +std::string +Launcher_cpp::dumpJob(int job_id) +{ + LAUNCHER_MESSAGE("dump Job"); + + Launcher::Job * job = findJob(job_id); + return Launcher::XML_Persistence::dumpJob(*job); +} + +int +Launcher_cpp::restoreJob(const std::string& dumpedJob) +{ + LAUNCHER_MESSAGE("restore Job"); + Launcher::Job * new_job=NULL; + int jobId = -1; + try { - LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); - throw LauncherException("Cannot find the job, is it created ?"); + new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob); + if(new_job) + { + jobId = addJob(new_job); + if(jobId < 0) + delete new_job; + } } - - it_job->second->stopJob(); + catch(const LauncherException &ex) + { + LAUNCHER_INFOS("Cannot load the job. Exception: " << ex.msg.c_str()); + if(new_job) + delete new_job; + } + return jobId; } //============================================================================= @@ -649,6 +623,24 @@ Launcher_cpp::stopJob(int job_id) "(libBatch was not present at compilation time)"); } +std::string +Launcher_cpp::dumpJob(int job_id) +{ + LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot dump job!!!"); + throw LauncherException("Method Launcher_cpp::dumpJob is not available " + "(libBatch was not present at compilation time)"); + return ""; +} + +int +Launcher_cpp::restoreJob(const std::string& dumpedJob) +{ + LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot restore job!!!"); + throw LauncherException("Method Launcher_cpp::restoreJob is not available " + "(libBatch was not present at compilation time)"); + return 0; +} + long Launcher_cpp::createJobWithFile( const std::string xmlExecuteFile, std::string clusterName) { @@ -800,7 +792,9 @@ Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job) // Step 3: add job to launcher map std::map::const_iterator it_job = _launcher_job_map.find(new_job->getNumber()); if (it_job == _launcher_job_map.end()) + { _launcher_job_map[new_job->getNumber()] = new_job; + } else { LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber()); @@ -811,6 +805,50 @@ Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job) #endif } +int +Launcher_cpp::addJob(Launcher::Job * new_job) +{ + string job_state = new_job->getState(); + int jobId = -1; + if (job_state == "CREATED") + { + // In this case, we ignore run_part information + createJob(new_job); + jobId = new_job->getNumber(); + } + else if (job_state == "QUEUED" || + job_state == "RUNNING" || + job_state == "IN_PROCESS" || + job_state == "PAUSED") + { + addJobDirectlyToMap(new_job); + jobId = new_job->getNumber(); + + // We check that the BatchManager could resume the job +#ifdef WITH_LIBBATCH + if (new_job->getBatchManagerJobId().getReference() != new_job->getReference()) + { + LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR"); + new_job->setState("ERROR"); + } +#endif + } + else if (job_state == "FINISHED" || + job_state == "FAILED" || + job_state == "ERROR") + { + // We add run_part information + addJobDirectlyToMap(new_job); + jobId = new_job->getNumber(); + } + else + { + LAUNCHER_INFOS("A bad job is found, state unknown " << job_state); + jobId = -1; + } + return jobId; +} + list Launcher_cpp::loadJobs(const char* jobs_file) { @@ -824,47 +862,14 @@ Launcher_cpp::loadJobs(const char* jobs_file) for (it_job = jobs_list.begin(); it_job != jobs_list.end(); it_job++) { Launcher::Job * new_job = *it_job; - string job_state = new_job->getState(); - + int jobId = -1; try { - if (job_state == "CREATED") - { - // In this case, we ignore run_part information - createJob(new_job); - new_jobs_id_list.push_back(new_job->getNumber()); - } - else if (job_state == "QUEUED" || - job_state == "RUNNING" || - job_state == "IN_PROCESS" || - job_state == "PAUSED") - { - addJobDirectlyToMap(new_job); - new_jobs_id_list.push_back(new_job->getNumber()); - - // Step 4: We check that the BatchManager could resume - // the job -#ifdef WITH_LIBBATCH - if (new_job->getBatchManagerJobId().getReference() != new_job->getReference()) - { - LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR"); - new_job->setState("ERROR"); - } -#endif - } - else if (job_state == "FINISHED" || - job_state == "FAILED" || - job_state == "ERROR") - { - // Step 2: We add run_part information - addJobDirectlyToMap(new_job); - new_jobs_id_list.push_back(new_job->getNumber()); - } + jobId = addJob(new_job); + if(jobId >= 0) + new_jobs_id_list.push_back(jobId); else - { - LAUNCHER_INFOS("A bad job is found, state unknown " << job_state); delete new_job; - } } catch(const LauncherException &ex) { @@ -891,3 +896,16 @@ Launcher_cpp::saveJobs(const char* jobs_file) // Save the jobs in XML file Launcher::XML_Persistence::saveJobs(jobs_file, jobs_list); } + +Launcher::Job * +Launcher_cpp::findJob(int job_id) +{ + std::map::const_iterator it_job = _launcher_job_map.find(job_id); + if (it_job == _launcher_job_map.end()) + { + LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id); + throw LauncherException("Cannot find the job, is it created ?"); + } + Launcher::Job * job = it_job->second; + return job; +} \ No newline at end of file diff --git a/src/Launcher/Launcher.hxx b/src/Launcher/Launcher.hxx index 728694edd..43030dddf 100644 --- a/src/Launcher/Launcher.hxx +++ b/src/Launcher/Launcher.hxx @@ -73,6 +73,8 @@ public: bool getJobWorkFile(int job_id, std::string work_file, std::string directory); void stopJob(int job_id); void removeJob(int job_id); + std::string dumpJob(int job_id); + int restoreJob(const std::string& dumpedJob); /*! Load the jobs from the file "jobs_file" and add them to the Launcher. * Return a list with the IDs of the jobs that were successfully loaded. @@ -96,6 +98,9 @@ public: protected: virtual void notifyObservers(const std::string & event_name, const std::string & event_data) {} + int addJob(Launcher::Job * new_job); + + Launcher::Job * findJob(int job_id); // Methods used by user interface methods #ifdef WITH_LIBBATCH diff --git a/src/Launcher/Launcher_Job_Command.cxx b/src/Launcher/Launcher_Job_Command.cxx index c725dc24f..e4f18f96e 100644 --- a/src/Launcher/Launcher_Job_Command.cxx +++ b/src/Launcher/Launcher_Job_Command.cxx @@ -78,6 +78,8 @@ Launcher::Job_Command::buildCommandScript(Batch::Parametre params, std::string l #ifndef WIN32 launch_script_stream << "#!/bin/sh -f" << std::endl; launch_script_stream << "cd " << work_directory << std::endl; + // remove the exit code from any previous execution + launch_script_stream << "rm -f logs/end_code.log" << std::endl; launch_script_stream << "export PYTHONPATH=" << work_directory << ":$PYTHONPATH" << std::endl; launch_script_stream << "export PATH=" << work_directory << ":$PATH" << std::endl; if (_env_file != "") @@ -97,6 +99,10 @@ Launcher::Job_Command::buildCommandScript(Batch::Parametre params, std::string l } #endif launch_script_stream << runCommandString() << std::endl; +#ifndef WIN32 + // log the exit code + launch_script_stream << "echo $? > logs/end_code.log" << std::endl; +#endif // Return launch_script_stream.flush(); diff --git a/src/Launcher/Launcher_Job_SALOME.cxx b/src/Launcher/Launcher_Job_SALOME.cxx index ee2812c1b..fa3f4b732 100644 --- a/src/Launcher/Launcher_Job_SALOME.cxx +++ b/src/Launcher/Launcher_Job_SALOME.cxx @@ -79,6 +79,9 @@ Launcher::Job_SALOME::buildSalomeScript(Batch::Parametre params) // Begin of script launch_script_stream << "#!/bin/sh -f" << std::endl; launch_script_stream << "cd " << work_directory << std::endl; + // remove the exit code from any previous execution + launch_script_stream << "rm -f logs/end_code.log" << std::endl; + launch_script_stream << "export PYTHONPATH=" << work_directory << ":$PYTHONPATH" << std::endl; launch_script_stream << "export PATH=" << work_directory << ":$PATH" << std::endl; if (_env_file != "") @@ -138,6 +141,8 @@ Launcher::Job_SALOME::buildSalomeScript(Batch::Parametre params) // Call real job type addJobTypeSpecificScript(launch_script_stream); + // log the exit code + launch_script_stream << "echo $? > logs/end_code.log" << std::endl; // End launch_script_stream << _resource_definition.AppliPath << "/salome kill \"$appli_port\"" << std::endl; diff --git a/src/Launcher/Launcher_XML_Persistence.cxx b/src/Launcher/Launcher_XML_Persistence.cxx index f7db94e58..3e4c50df4 100644 --- a/src/Launcher/Launcher_XML_Persistence.cxx +++ b/src/Launcher/Launcher_XML_Persistence.cxx @@ -515,4 +515,72 @@ XML_Persistence::addAttr(xmlNodePtr node, const string & name, const string & va xmlFree(xmlStrValue); } +Job* +XML_Persistence::createJobFromString(const std::string& jobDump) +{ + xmlDocPtr doc; + doc = xmlReadMemory(jobDump.c_str(), jobDump.length(), "noname.xml", NULL, 0); + if (doc == NULL) + { + std::string error = "Error in xmlReadMemory in XML_Persistence::createJobFromString, could not parse string: " + jobDump; + LAUNCHER_INFOS(error); + throw LauncherException(error); + } + + // Step 3: Find jobs + Job * result = NULL; + xmlNodePtr root_node = xmlDocGetRootElement(doc); + if (xmlStrToString(root_node->name) == "jobs") + { + xmlNodePtr xmlCurrentNode = root_node->xmlChildrenNode; + while(xmlCurrentNode != NULL && result == NULL) + { + if (xmlStrToString(xmlCurrentNode->name) == "job") + { + LAUNCHER_INFOS("A job is found"); + result = createJobFromXmlNode(xmlCurrentNode); + } + xmlCurrentNode = xmlCurrentNode->next; + } + } + else + { + xmlFreeDoc(doc); + std::string error = "Error while parsing job dump: " + jobDump; + LAUNCHER_INFOS(error); + throw LauncherException(error); + } + + // Clean + xmlFreeDoc(doc); + return result; +} + +std::string +XML_Persistence::dumpJob(const Job& job) +{ + // Initialization + xmlKeepBlanksDefault(0); + xmlDocPtr doc = xmlNewDoc(xmlCharStrdup("1.0")); + xmlNodePtr root_node = xmlNewNode(NULL, xmlCharStrdup("jobs")); + xmlDocSetRootElement(doc, root_node); + xmlNodePtr doc_comment = xmlNewDocComment(doc, xmlCharStrdup("SALOME Launcher job")); + xmlAddPrevSibling(root_node, doc_comment); + + addJobToXmlDocument(root_node, job); + + // Final step: write to result + xmlChar *xmlbuff; + int buffersize; + xmlDocDumpFormatMemory(doc, &xmlbuff, &buffersize, 1); + std::string result; + if(buffersize > 0) + result = (const char*) xmlbuff; + + // Clean + xmlFree(xmlbuff); + xmlFreeDoc(doc); + return result; +} + } diff --git a/src/Launcher/Launcher_XML_Persistence.hxx b/src/Launcher/Launcher_XML_Persistence.hxx index 42756a2d2..d80d3d80d 100644 --- a/src/Launcher/Launcher_XML_Persistence.hxx +++ b/src/Launcher/Launcher_XML_Persistence.hxx @@ -48,6 +48,9 @@ namespace Launcher //! Save the jobs in the list "jobs_list" to the XML file "jobs_file". static void saveJobs(const char* jobs_file, const std::list & jobs_list); + static Job* createJobFromString(const std::string& jobDump); + static std::string dumpJob(const Job& job); + private: // This class is static only, not instanciable XML_Persistence() {} diff --git a/src/Launcher/SALOME_Launcher.cxx b/src/Launcher/SALOME_Launcher.cxx index cc7129bf0..b35e1c563 100644 --- a/src/Launcher/SALOME_Launcher.cxx +++ b/src/Launcher/SALOME_Launcher.cxx @@ -373,6 +373,43 @@ SALOME_Launcher::stopJob(CORBA::Long job_id) } } +char * +SALOME_Launcher::dumpJob(CORBA::Long job_id) +{ + std::string result; + try + { + result = _l.dumpJob(job_id); + } + catch(const LauncherException &ex) + { + INFOS(ex.msg.c_str()); + THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM); + } + return CORBA::string_dup(result.c_str()); +} + +CORBA::Long +SALOME_Launcher::restoreJob(const char * dumpedJob) +{ + CORBA::Long jobId; + try{ + jobId = _l.restoreJob(dumpedJob); + if(jobId >= 0) + { + std::ostringstream job_str; + job_str << jobId; + notifyObservers("NEW_JOB", job_str.str()); + } + } + catch(const LauncherException &ex){ + INFOS(ex.msg.c_str()); + THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR); + } + + return jobId; +} + //============================================================================= /*! CORBA Method: * Create a job in the launcher with a file diff --git a/src/Launcher/SALOME_Launcher.hxx b/src/Launcher/SALOME_Launcher.hxx index a04a5d48e..4c46a6399 100644 --- a/src/Launcher/SALOME_Launcher.hxx +++ b/src/Launcher/SALOME_Launcher.hxx @@ -56,6 +56,8 @@ public: CORBA::Boolean getJobWorkFile(CORBA::Long job_id, const char * work_file, const char * directory); void stopJob (CORBA::Long job_id); void removeJob (CORBA::Long job_id); + char * dumpJob(CORBA::Long job_id); + CORBA::Long restoreJob(const char * dumpedJob); // Useful methods CORBA::Long createJobWithFile(const char * xmlExecuteFile, const char * clusterName); diff --git a/src/ResourcesManager/ResourcesManager.cxx b/src/ResourcesManager/ResourcesManager.cxx index 6ce6d140e..0603eb51d 100644 --- a/src/ResourcesManager/ResourcesManager.cxx +++ b/src/ResourcesManager/ResourcesManager.cxx @@ -53,7 +53,7 @@ resourceParams::resourceParams() : can_launch_batch_jobs(false), can_run_containers(false), nb_proc(-1), - nb_node(-1), + nb_node(0), nb_proc_per_node(-1), cpu_clock(-1), mem_mb(-1) -- 2.39.2