for cmd in command:
single_cmd = cmd.strip().split(' ')
any_error = False
+ error_code = 1
try:
proc = subprocess.Popen(single_cmd)
(stdoutdata, stderrdata) = proc.communicate() # Wait for process to terminate
if proc.returncode != 0:
any_error = True
+ error_code = proc.returncode
except:
any_error = True
pass
sys.stdout.write("".join(outmsg))
if errmsg:
sys.stderr.write("".join(errmsg))
- sys.exit(1)
+ sys.exit(error_code)
return 0
else:
//! Kill the job and remove it from the jobs list
void removeJob (in long job_id) raises (SALOME::SALOME_Exception);
+ //! Get the job's serialization string
+ string dumpJob(in long job_id) raises (SALOME::SALOME_Exception);
+ //! Create a job from its serialization string
+ /*! \param dumpedJob Serialization string returned by dumpJob.
+ \return Job id
+ */
+ long restoreJob(in string dumpedJob) raises (SALOME::SALOME_Exception);
+
// Useful methods
long createJobWithFile(in string xmlJobFile, in string clusterName) raises (SALOME::SALOME_Exception);
boolean testBatch (in ResourceParameters params) raises (SALOME::SALOME_Exception);
void loadJobs(in string jobs_file) raises (SALOME::SALOME_Exception);
//! Save the current list of jobs in an xml file.
void saveJobs(in string jobs_file) raises (SALOME::SALOME_Exception);
-
};
};
SALOME_Launcher_Parser.hxx
SALOME_Launcher_defs.hxx
)
+
+SET(LAUNCHER_PYTHON_SCRIPTS
+ launcher_proxy.py
+)
+
INSTALL(FILES ${COMMON_HEADERS_HXX} DESTINATION ${SALOME_INSTALL_HEADERS})
INSTALL(FILES testLauncher.xml DESTINATION ${SALOME_KERNEL_INSTALL_RES_DATA})
+SALOME_INSTALL_SCRIPTS("${LAUNCHER_PYTHON_SCRIPTS}" ${SALOME_INSTALL_PYTHON})
{
LAUNCHER_MESSAGE("Launcher_cpp constructor");
_job_cpt = 0;
- _job_cpt_mutex = new pthread_mutex_t();
- pthread_mutex_init(_job_cpt_mutex, NULL);
}
//=============================================================================
for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
delete it1->second;
#endif
-
- pthread_mutex_destroy(_job_cpt_mutex);
- delete _job_cpt_mutex;
}
#ifdef WITH_LIBBATCH
{
LAUNCHER_MESSAGE("Creating a new job");
// Add job to the jobs map
- pthread_mutex_lock(_job_cpt_mutex);
new_job->setNumber(_job_cpt);
_job_cpt++;
- pthread_mutex_unlock(_job_cpt_mutex);
std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
if (it_job == _launcher_job_map.end())
+ {
_launcher_job_map[new_job->getNumber()] = new_job;
+ }
else
{
- LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
+ LAUNCHER_INFOS("A job has already the same id: " << new_job->getNumber());
delete new_job;
- throw LauncherException("A job as already the same id - job is not created !");
+ throw LauncherException("A job has already the same id - job is not created !");
}
LAUNCHER_MESSAGE("New Job created");
}
{
LAUNCHER_MESSAGE("Launch a job");
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ // Check if job exists
+ Launcher::Job * job = findJob(job_id);
// Check job state (cannot launch a job already launched...)
if (job->getState() != "CREATED")
throw LauncherException("Bad state of the job: " + job->getState());
}
- // Third step search batch manager for the job into the map -> instantiate one if does not exist
-#ifdef WITH_LIBBATCH
- std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
- if(it == _batchmap.end())
- {
- createBatchManagerForJob(job);
- }
-#endif
+ Batch::BatchManager * bm = getBatchManager(job);
try {
- Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob()));
+ Batch::JobId batch_manager_job_id = bm->submitJob(*(job->getBatchJob()));
job->setBatchManagerJobId(batch_manager_job_id);
job->setState("QUEUED");
job->setReference(batch_manager_job_id.getReference());
LAUNCHER_MESSAGE("Get job state");
// Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string state;
try
LAUNCHER_MESSAGE("Get job assigned hostnames");
// Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string assigned_hostnames = job->getAssignedHostnames();
return assigned_hostnames.c_str();
{
LAUNCHER_MESSAGE("Get Job results");
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string resource_name = job->getResourceDefinition().Name;
try
{
{
LAUNCHER_MESSAGE("Clear the remote working directory");
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
try
{
_batchmap[job_id]->clearWorkingDir(*(job->getBatchJob()));
bool rtn;
LAUNCHER_MESSAGE("Get Job dump state");
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string resource_name = job->getResourceDefinition().Name;
try
{
bool rtn;
LAUNCHER_MESSAGE("Get working file " << work_file);
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string resource_name = job->getResourceDefinition().Name;
try
{
{
LAUNCHER_MESSAGE("Stop Job");
- // Check if job exist
- std::map<int, Launcher::Job *>::iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
+ Launcher::Job * job = findJob(job_id);
+ job->stopJob();
+}
+
+std::string
+Launcher_cpp::dumpJob(int job_id)
+{
+ LAUNCHER_MESSAGE("dump Job");
+
+ Launcher::Job * job = findJob(job_id);
+ return Launcher::XML_Persistence::dumpJob(*job);
+}
+
+int
+Launcher_cpp::restoreJob(const std::string& dumpedJob)
+{
+ LAUNCHER_MESSAGE("restore Job");
+ Launcher::Job * new_job=NULL;
+ int jobId = -1;
+ try
{
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
+ {
+ new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob);
+ }
+ if(new_job)
+ {
+ jobId = addJob(new_job);
+ if(jobId < 0)
+ delete new_job;
+ }
}
-
- it_job->second->stopJob();
+ catch(const LauncherException &ex)
+ {
+ LAUNCHER_INFOS("Cannot load the job. Exception: " << ex.msg.c_str());
+ if(new_job)
+ delete new_job;
+ }
+ return jobId;
}
//=============================================================================
"(libBatch was not present at compilation time)");
}
+std::string
+Launcher_cpp::dumpJob(int job_id)
+{
+ LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot dump job!!!");
+ throw LauncherException("Method Launcher_cpp::dumpJob is not available "
+ "(libBatch was not present at compilation time)");
+ return "";
+}
+
+int
+Launcher_cpp::restoreJob(const std::string& dumpedJob)
+{
+ LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot restore job!!!");
+ throw LauncherException("Method Launcher_cpp::restoreJob is not available "
+ "(libBatch was not present at compilation time)");
+ return 0;
+}
+
long
Launcher_cpp::createJobWithFile( const std::string xmlExecuteFile, std::string clusterName)
{
return _launcher_job_map;
}
-void
-Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
+#ifdef WITH_LIBBATCH
+Batch::BatchManager*
+Launcher_cpp::getBatchManager(Launcher::Job * job)
{
+ Batch::BatchManager* result = nullptr;
int job_id = job->getNumber();
// Select a resource for the job
}
// Step 2: We can now add a Factory if the resource is correctly define
-#ifdef WITH_LIBBATCH
std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
if(it == _batchmap.end())
{
// Warning cannot write on one line like this, because map object is constructed before
// the method is called...
//_batchmap[job_id] = FactoryBatchManager(resource_definition);
- Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition);
- _batchmap[job_id] = batch_client;
+ result = FactoryBatchManager(resource_definition);
+ _batchmap[job_id] = result;
}
catch(const LauncherException &ex)
{
throw LauncherException(ex.message);
}
}
-#endif
+ else
+ result = it->second;
+ return result;
}
+#endif
void
Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job)
{
// Step 0: Calculated job_id
- pthread_mutex_lock(_job_cpt_mutex);
- int job_id = _job_cpt;
+ new_job->setNumber(_job_cpt);
_job_cpt++;
- new_job->setNumber(job_id);
- pthread_mutex_unlock(_job_cpt_mutex);
+#ifdef WITH_LIBBATCH
// Step 1: check if resource is already in the map
- createBatchManagerForJob(new_job);
+ Batch::BatchManager * bm = getBatchManager(new_job);
// Step 2: add the job to the batch manager
-#ifdef WITH_LIBBATCH
try
{
- Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()),
- new_job->getReference());
+ Batch::JobId batch_manager_job_id = bm->addJob(*(new_job->getBatchJob()),
+ new_job->getReference());
new_job->setBatchManagerJobId(batch_manager_job_id);
}
catch(const Batch::GenericException &ex)
// Step 3: add job to launcher map
std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
if (it_job == _launcher_job_map.end())
+ {
_launcher_job_map[new_job->getNumber()] = new_job;
+ }
else
{
LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
#endif
}
+int
+Launcher_cpp::addJob(Launcher::Job * new_job)
+{
+ string job_state = new_job->getState();
+ int jobId = -1;
+ if (job_state == "CREATED")
+ {
+ // In this case, we ignore run_part information
+ createJob(new_job);
+ jobId = new_job->getNumber();
+ }
+ else if (job_state == "QUEUED" ||
+ job_state == "RUNNING" ||
+ job_state == "IN_PROCESS" ||
+ job_state == "PAUSED")
+ {
+ addJobDirectlyToMap(new_job);
+ jobId = new_job->getNumber();
+
+ // We check that the BatchManager could resume the job
+#ifdef WITH_LIBBATCH
+ if (new_job->getBatchManagerJobId().getReference() != new_job->getReference())
+ {
+ LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR");
+ new_job->setState("ERROR");
+ }
+#endif
+ }
+ else if (job_state == "FINISHED" ||
+ job_state == "FAILED" ||
+ job_state == "ERROR")
+ {
+ // We add run_part information
+ addJobDirectlyToMap(new_job);
+ jobId = new_job->getNumber();
+ }
+ else
+ {
+ LAUNCHER_INFOS("A bad job is found, state unknown " << job_state);
+ jobId = -1;
+ }
+ return jobId;
+}
+
list<int>
Launcher_cpp::loadJobs(const char* jobs_file)
{
for (it_job = jobs_list.begin(); it_job != jobs_list.end(); it_job++)
{
Launcher::Job * new_job = *it_job;
- string job_state = new_job->getState();
-
+ int jobId = -1;
try
{
- if (job_state == "CREATED")
- {
- // In this case, we ignore run_part information
- createJob(new_job);
- new_jobs_id_list.push_back(new_job->getNumber());
- }
- else if (job_state == "QUEUED" ||
- job_state == "RUNNING" ||
- job_state == "IN_PROCESS" ||
- job_state == "PAUSED")
- {
- addJobDirectlyToMap(new_job);
- new_jobs_id_list.push_back(new_job->getNumber());
-
- // Step 4: We check that the BatchManager could resume
- // the job
-#ifdef WITH_LIBBATCH
- if (new_job->getBatchManagerJobId().getReference() != new_job->getReference())
- {
- LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR");
- new_job->setState("ERROR");
- }
-#endif
- }
- else if (job_state == "FINISHED" ||
- job_state == "FAILED" ||
- job_state == "ERROR")
- {
- // Step 2: We add run_part information
- addJobDirectlyToMap(new_job);
- new_jobs_id_list.push_back(new_job->getNumber());
- }
+ jobId = addJob(new_job);
+ if(jobId >= 0)
+ new_jobs_id_list.push_back(jobId);
else
- {
- LAUNCHER_INFOS("A bad job is found, state unknown " << job_state);
delete new_job;
- }
}
catch(const LauncherException &ex)
{
{
// Create a sorted list from the internal job map
list<const Launcher::Job *> jobs_list;
- for (int i=0; i<_job_cpt; i++)
{
- map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
- if (it_job != _launcher_job_map.end())
- jobs_list.push_back(it_job->second);
+ for (int i=0; i<_job_cpt; i++)
+ {
+ map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
+ if (it_job != _launcher_job_map.end())
+ jobs_list.push_back(it_job->second);
+ }
}
-
// Save the jobs in XML file
Launcher::XML_Persistence::saveJobs(jobs_file, jobs_list);
}
+
+Launcher::Job *
+Launcher_cpp::findJob(int job_id)
+{
+ std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
+ if (it_job == _launcher_job_map.end())
+ {
+ LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
+ throw LauncherException("Cannot find the job, is it created ?");
+ }
+ Launcher::Job * job = it_job->second;
+ return job;
+}
\ No newline at end of file
#include <vector>
#include <list>
-#include <pthread.h>
-
class MpiImpl;
namespace Batch{
bool getJobWorkFile(int job_id, std::string work_file, std::string directory);
void stopJob(int job_id);
void removeJob(int job_id);
+ std::string dumpJob(int job_id);
+ int restoreJob(const std::string& dumpedJob);
/*! Load the jobs from the file "jobs_file" and add them to the Launcher.
* Return a list with the IDs of the jobs that were successfully loaded.
// Useful methods
long createJobWithFile(std::string xmlExecuteFile, std::string clusterName);
std::map<int, Launcher::Job *> getJobs();
- void createBatchManagerForJob(Launcher::Job * job);
void addJobDirectlyToMap(Launcher::Job * new_job);
// Lib methods
protected:
virtual void notifyObservers(const std::string & event_name, const std::string & event_data) {}
+ int addJob(Launcher::Job * new_job);
+
+ Launcher::Job * findJob(int job_id);
// Methods used by user interface methods
#ifdef WITH_LIBBATCH
Batch::BatchManager *FactoryBatchManager(ParserResourcesType& params);
std::map <int, Batch::BatchManager*> _batchmap;
+ Batch::BatchManager* getBatchManager(Launcher::Job * job);
#endif
ParserLauncherType ParseXmlFile(std::string xmlExecuteFile);
std::map <int, Launcher::Job *> _launcher_job_map;
int _job_cpt; // job number counter
- pthread_mutex_t * _job_cpt_mutex; // mutex for job counter
};
#endif
#ifndef WIN32
launch_script_stream << "#!/bin/sh -f" << std::endl;
launch_script_stream << "cd " << work_directory << std::endl;
+ // remove the exit code from any previous execution
+ launch_script_stream << "rm -f logs/exit_code.log" << std::endl;
launch_script_stream << "export PYTHONPATH=" << work_directory << ":$PYTHONPATH" << std::endl;
launch_script_stream << "export PATH=" << work_directory << ":$PATH" << std::endl;
if (_env_file != "")
}
#endif
launch_script_stream << runCommandString() << std::endl;
+#ifndef WIN32
+ // log the exit code
+ launch_script_stream << "echo $? > logs/exit_code.log" << std::endl;
+#endif
// Return
launch_script_stream.flush();
// Begin of script
launch_script_stream << "#!/bin/sh -f" << std::endl;
launch_script_stream << "cd " << work_directory << std::endl;
+ // remove the exit code from any previous execution
+ launch_script_stream << "rm -f logs/exit_code.log" << std::endl;
+
launch_script_stream << "export PYTHONPATH=" << work_directory << ":$PYTHONPATH" << std::endl;
launch_script_stream << "export PATH=" << work_directory << ":$PATH" << std::endl;
if (_env_file != "")
// Call real job type
addJobTypeSpecificScript(launch_script_stream);
+ // log the exit code
+ launch_script_stream << "echo $? > logs/exit_code.log" << std::endl;
// End
launch_script_stream << _resource_definition.AppliPath << "/salome kill \"$appli_port\"" << std::endl;
xmlFree(xmlStrValue);
}
+Job*
+XML_Persistence::createJobFromString(const std::string& jobDump)
+{
+ xmlDocPtr doc;
+ doc = xmlReadMemory(jobDump.c_str(), jobDump.length(), "noname.xml", NULL, 0);
+ if (doc == NULL)
+ {
+ std::string error = "Error in xmlReadMemory in XML_Persistence::createJobFromString, could not parse string: " + jobDump;
+ LAUNCHER_INFOS(error);
+ throw LauncherException(error);
+ }
+
+ // Step 3: Find jobs
+ Job * result = NULL;
+ xmlNodePtr root_node = xmlDocGetRootElement(doc);
+ if (xmlStrToString(root_node->name) == "jobs")
+ {
+ xmlNodePtr xmlCurrentNode = root_node->xmlChildrenNode;
+ while(xmlCurrentNode != NULL && result == NULL)
+ {
+ if (xmlStrToString(xmlCurrentNode->name) == "job")
+ {
+ LAUNCHER_INFOS("A job is found");
+ result = createJobFromXmlNode(xmlCurrentNode);
+ }
+ xmlCurrentNode = xmlCurrentNode->next;
+ }
+ }
+ else
+ {
+ xmlFreeDoc(doc);
+ std::string error = "Error while parsing job dump: " + jobDump;
+ LAUNCHER_INFOS(error);
+ throw LauncherException(error);
+ }
+
+ // Clean
+ xmlFreeDoc(doc);
+ return result;
+}
+
+std::string
+XML_Persistence::dumpJob(const Job& job)
+{
+ // Initialization
+ xmlKeepBlanksDefault(0);
+ xmlDocPtr doc = xmlNewDoc(xmlCharStrdup("1.0"));
+ xmlNodePtr root_node = xmlNewNode(NULL, xmlCharStrdup("jobs"));
+ xmlDocSetRootElement(doc, root_node);
+ xmlNodePtr doc_comment = xmlNewDocComment(doc, xmlCharStrdup("SALOME Launcher job"));
+ xmlAddPrevSibling(root_node, doc_comment);
+
+ addJobToXmlDocument(root_node, job);
+
+ // Final step: write to result
+ xmlChar *xmlbuff;
+ int buffersize;
+ xmlDocDumpFormatMemory(doc, &xmlbuff, &buffersize, 1);
+ std::string result;
+ if(buffersize > 0)
+ result = (const char*) xmlbuff;
+
+ // Clean
+ xmlFree(xmlbuff);
+ xmlFreeDoc(doc);
+ return result;
+}
+
}
//! Save the jobs in the list "jobs_list" to the XML file "jobs_file".
static void saveJobs(const char* jobs_file, const std::list<const Job *> & jobs_list);
+ static Job* createJobFromString(const std::string& jobDump);
+ static std::string dumpJob(const Job& job);
+
private:
// This class is static only, not instanciable
XML_Persistence() {}
}
}
+char *
+SALOME_Launcher::dumpJob(CORBA::Long job_id)
+{
+ std::string result;
+ try
+ {
+ result = _l.dumpJob(job_id);
+ }
+ catch(const LauncherException &ex)
+ {
+ INFOS(ex.msg.c_str());
+ THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
+ }
+ return CORBA::string_dup(result.c_str());
+}
+
+CORBA::Long
+SALOME_Launcher::restoreJob(const char * dumpedJob)
+{
+ CORBA::Long jobId;
+ try{
+ jobId = _l.restoreJob(dumpedJob);
+ if(jobId >= 0)
+ {
+ std::ostringstream job_str;
+ job_str << jobId;
+ notifyObservers("NEW_JOB", job_str.str());
+ }
+ }
+ catch(const LauncherException &ex){
+ INFOS(ex.msg.c_str());
+ THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::INTERNAL_ERROR);
+ }
+
+ return jobId;
+}
+
//=============================================================================
/*! CORBA Method:
* Create a job in the launcher with a file
CORBA::Boolean getJobWorkFile(CORBA::Long job_id, const char * work_file, const char * directory);
void stopJob (CORBA::Long job_id);
void removeJob (CORBA::Long job_id);
+ char * dumpJob(CORBA::Long job_id);
+ CORBA::Long restoreJob(const char * dumpedJob);
// Useful methods
CORBA::Long createJobWithFile(const char * xmlExecuteFile, const char * clusterName);
}
try
{
- SALOME_Launcher *lServ(new SALOME_Launcher(orb,root_poa));
+ CORBA::PolicyList policies;
+ policies.length(1);
+ PortableServer::ThreadPolicy_var threadPol(root_poa->create_thread_policy(PortableServer::SINGLE_THREAD_MODEL));
+ policies[0] = PortableServer::ThreadPolicy::_duplicate(threadPol);
+ PortableServer::POA_var safePOA = root_poa->create_POA("SingleThreadPOA",pman,policies);
+ threadPol->destroy();
+
+ SALOME_Launcher *lServ(new SALOME_Launcher(orb,safePOA));
lServ->_remove_ref();
//
SALOMESDS::DataServerManager *dsm(new SALOMESDS::DataServerManager(argc,argv,orb,root_poa));
INSTALL(FILES CTestTestfileInstall.cmake
DESTINATION ${KERNEL_TEST_DIR}/Launcher
RENAME CTestTestfile.cmake)
+ SET(STRESS_TEST_FILES
+ test_stress.sh
+ launcher_use_case.py
+ )
+ INSTALL(PROGRAMS ${STRESS_TEST_FILES}
+ DESTINATION ${KERNEL_TEST_DIR}/Launcher)
ENDIF()
IF(NOT WIN32)
SET(TEST_NAME ${COMPONENT_NAME}_Launcher)
ADD_TEST(${TEST_NAME} python ${SALOME_TEST_DRIVER} 2000 test_launcher.py)
+ SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES LABELS "${COMPONENT_NAME}"
+ # TIMEOUT 500
+ )
+
+ SET(TEST_NAME ${COMPONENT_NAME}_StressLauncher)
+ ADD_TEST(${TEST_NAME} python ${SALOME_TEST_DRIVER} 2000 ./test_stress.sh)
SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES LABELS "${COMPONENT_NAME}"
# TIMEOUT 500
)
--- /dev/null
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# classic use case of a job
+import os
+import salome
+import tempfile
+import time
+import sys
+
+if __name__ == '__main__':
+ salome.salome_init()
+ launcher = salome.naming_service.Resolve('/SalomeLauncher')
+ job_params = salome.JobParameters()
+ job_params.resource_required = salome.ResourceParameters()
+ job_params.resource_required.name = "localhost"
+ job_params.resource_required.nb_proc = 1 # slurm: --ntasks
+
+ job_params.job_type = "command"
+ #cwd = os.getcwd()
+ cwd = sys.argv[1]
+ case_dir = tempfile.mkdtemp(prefix="test")
+ job_params.local_directory = cwd
+ job_params.job_file = "command.sh"
+ job_params.work_directory = os.path.join(case_dir, "run")
+ job_params.result_directory = os.path.join(case_dir, "result")
+ job_params.out_files = ["result.txt"]
+ job_params.wckey="P11U5:CARBONES"
+ job_params.job_name = "MyJob"
+ job_id = launcher.createJob(job_params)
+ launcher.launchJob(job_id)
+ job_string = launcher.dumpJob(job_id)
+ jobState = launcher.getJobState(job_id)
+ while jobState != 'FINISHED' and jobState != 'FAILED':
+ time.sleep(1)
+ jobState = launcher.getJobState(job_id)
+ launcher.getJobResults(job_id, '')
+ launcher.getJobWorkFile(job_id, "result.txt", '')
+ launcher.clearJobWorkingDir(job_id)
+ launcher.removeJob(job_id)
+ new_id = launcher.restoreJob(job_string)
+ job_params_bis = launcher.getJobParameters(new_id)
+ jobState = launcher.getJobState(new_id)
+ launcher.removeJob(new_id)
+ launcher.saveJobs(os.path.join(case_dir,"savejobs.xml"))
--- /dev/null
+#! /bin/bash
+# This test launches in parallel a greate number of instancies of a usual use case
+WORKDIR=`mktemp -d`
+echo WORKDIR: $WORKDIR
+cat > $WORKDIR/command.sh <<< 'echo "OK" > result.txt'
+chmod 755 $WORKDIR/command.sh
+for i in {1..500}
+do
+ python launcher_use_case.py $WORKDIR 2> $WORKDIR/log$i.err &
+done
+exit_code=0
+for i in {1..500}
+do
+ wait -n
+ ret=$?
+ if [ $ret -ne "0" ]
+ then
+ echo "Error detected!"
+ exit_code=1
+ fi
+done
+# list of error files not empty
+ls -l $WORKDIR/*.err | awk '{if ($5 != "0") print $0}'
+exit $exit_code
\ No newline at end of file
--- /dev/null
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2017-2018 CEA/DEN, EDF R&D
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
+#
+""" Easier access to SalomeLauncher"""
+
+import os
+import salome
+import time
+
+JOB_FILE_NAME = "jobDump.xml"
+
+class Job(object):
+ """
+ This class makes an easier access to SalomeLauncher.
+ It adds an automatic save of the job's parameters after the launch. The save
+ is done into the result directory of the job. It is then possible to reload
+ the job from that file. It also provides a waiting loop for the job to finish.
+ This proxy also hides the calls to the naming service in order to get the
+ instance of SalomeLauncher.
+ """
+
+ @staticmethod
+ def launch(job_params):
+ """ Create, submit and dump a new job to result_dir."""
+ myjob = Job()
+ launcher = salome.naming_service.Resolve('/SalomeLauncher')
+ myjob.launcher = launcher
+ result_dir = job_params.result_directory # local path where to copy out_files
+
+ myjob.job_id = launcher.createJob(job_params) #SALOME id of the job
+ launcher.launchJob(myjob.job_id) # copy files, run pre_command, submit job
+ myjob.saveJob(result_dir)
+ return myjob
+
+ @staticmethod
+ def reloadJob(result_dir):
+ """ Create a new job from a job dumped in result_dir."""
+ myjob = Job()
+ launcher = salome.naming_service.Resolve('/SalomeLauncher')
+ myjob.launcher = launcher
+ myjob.job_id = -1
+ try:
+ job_file_path = os.path.join(result_dir, JOB_FILE_NAME)
+ job_string = ""
+ with open(job_file_path, "r") as f:
+ job_string = f.read()
+ myjob.job_id = launcher.restoreJob(job_string)
+ except:
+ myjob = None
+ return myjob
+
+ def saveJob(self, result_dir):
+ job_string = self.launcher.dumpJob(self.job_id)
+ job_file_path = os.path.join(result_dir, JOB_FILE_NAME)
+ if not os.path.exists(result_dir):
+ os.makedirs(result_dir)
+ with open(job_file_path, "w") as f:
+ f.write(job_string)
+
+ def wait(self, sleep_delay=10):
+ """ Wait for the end of the job """
+ launcher = self.launcher
+ job_id = self.job_id
+ jobState = launcher.getJobState(job_id)
+ while jobState != "FINISHED" and jobState != "FAILED" :
+ time.sleep(sleep_delay)
+ jobState = launcher.getJobState(job_id)
+ print ("Job %d state: %s" % (job_id,jobState))
+
+ def verify(self):
+ """ Get the return code of the solver if the job is finished.
+ If the job is not finished (submission error or another state),
+ return an empty string.
+ """
+ launcher = self.launcher
+ job_id = self.job_id
+ jobState = launcher.getJobState(job_id)
+ exit_code = ""
+ if jobState != "FINISHED" :
+ print ("Job has not finished correctly.")
+ print ("Job %d state: %s" % (job_id,jobState))
+ else :
+ job_params = launcher.getJobParameters(job_id)
+ temp_dir = job_params.result_directory
+ temp_log_dir = os.path.join(temp_dir, "logs")
+ if(launcher.getJobWorkFile(job_id, "logs/exit_code.log", temp_log_dir)):
+ exit_code_file = os.path.join(temp_log_dir, "exit_code.log")
+ if os.path.isfile(exit_code_file):
+ with open(exit_code_file) as myfile:
+ exit_code = myfile.read()
+ return exit_code.strip()
+
+ def getResults(self):
+ """ Copy the result files from remote working_directory
+ to the local result_directory."""
+ self.launcher.getJobResults(self.job_id, "")
+
+ def relaunch(self, script=""):
+ job_params = self.launcher.getJobParameters(self.job_id)
+ if script:
+ job_params.job_file = script
+ job_params.pre_command = ""
+ self.job_id = self.launcher.createJob(job_params)
+ self.launcher.launchJob(self.job_id)
+ self.saveJob(job_params.result_directory)
: can_launch_batch_jobs(false),
can_run_containers(false),
nb_proc(-1),
- nb_node(-1),
+ nb_node(0),
nb_proc_per_node(-1),
cpu_clock(-1),
mem_mb(-1)
ResourcesManager_cpp::AddResourceInCatalog(const ParserResourcesType & new_resource)
{
if (new_resource.Name == DEFAULT_RESOURCE_NAME){
- std::string error("Cannot modify default local resource \"" + DEFAULT_RESOURCE_NAME + "\"");
- throw ResourcesException(error);
+ ParserResourcesType default_resource = _resourcesList[DEFAULT_RESOURCE_NAME];
+ // some of the properties of the default resource shouldn't be modified
+ std::string check;
+ if( default_resource.HostName != new_resource.HostName)
+ check += "The Hostname property of the default resource can not be modified.\n";
+ if( default_resource.AppliPath != new_resource.AppliPath)
+ check += "The Applipath property of the default resource can not be modified.\n";
+ if( !new_resource.can_run_containers)
+ check += "The default resource should be able to run containers.\n";
+ if( !new_resource.can_launch_batch_jobs)
+ check += "The default resource should be able to launch batch jobs.\n";
+ if( default_resource.Protocol != new_resource.Protocol)
+ check += "The Protocol property of the default resource can not be modified.\n";
+ if(!check.empty())
+ throw ResourcesException(check);
}
// TODO - Add minimal check
_resourcesList[new_resource.Name] = new_resource;
RES_MESSAGE("WriteInXmlFile : start");
MapOfParserResourcesType resourceListToSave(_resourcesList);
- // We do not save default local resource because it is automatically created at startup
- resourceListToSave.erase(DEFAULT_RESOURCE_NAME);
if (resourceListToSave.empty())
{
- RES_MESSAGE("WriteInXmlFile: nothing to do, no resource except default \"" <<
- DEFAULT_RESOURCE_NAME << "\"");
+ RES_MESSAGE("WriteInXmlFile: nothing to do, no resource to save!");
return;
}