From: ribes Date: Fri, 23 Jul 2010 11:35:17 +0000 (+0000) Subject: - Launcher can now load and save its jobs list X-Git-Tag: V6_2_0a1~16 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=e4750cd3bd0c7fb1ef20592274ae01ef7bc59fc6;p=modules%2Fkernel.git - Launcher can now load and save its jobs list - Added introspection methods for Launcher --- diff --git a/idl/SALOME_ContainerManager.idl b/idl/SALOME_ContainerManager.idl index ed5c9796e..0392c3af5 100644 --- a/idl/SALOME_ContainerManager.idl +++ b/idl/SALOME_ContainerManager.idl @@ -153,6 +153,7 @@ exception NotFound {}; struct JobParameters { + string job_name; //! Job Type - Could be equal to "command" or "yacs_file" or "python_salome" string job_type; @@ -185,6 +186,18 @@ struct JobParameters Engines::ParameterList specific_parameters; }; +struct JobDescription +{ + long job_id; + Engines::JobParameters job_parameters; +}; +typedef sequence JobsList; + +interface SalomeLauncherObserver +{ + void notifyObserver(in string event_name, in string event_data); +}; + /*! \brief Interface of the %salomelauncher This interface is used for interaction with the unique instance of SalomeLauncher @@ -200,11 +213,21 @@ interface SalomeLauncher // Useful methods long createJobWithFile(in string xmlJobFile, in string clusterName) raises (SALOME::SALOME_Exception); - boolean testBatch (in ResourceParameters params) raises (SALOME::SALOME_Exception); + boolean testBatch (in ResourceParameters params) raises (SALOME::SALOME_Exception); // SALOME kernel service methods void Shutdown(); long getPID(); + + // Observer and introspection methods + void addObserver(in Engines::SalomeLauncherObserver observer); + Engines::JobsList getJobsList(); + Engines::JobParameters getJobParameters(in long job_id) raises (SALOME::SALOME_Exception); + + // Save and load methods + void loadJobs(in string jobs_file) raises (SALOME::SALOME_Exception); + void saveJobs(in string jobs_file) raises (SALOME::SALOME_Exception); + }; /*! \brief Interface of the %containerManager diff --git a/src/Launcher/Launcher.cxx b/src/Launcher/Launcher.cxx index e5d8e9dc8..c169487a2 100644 --- a/src/Launcher/Launcher.cxx +++ b/src/Launcher/Launcher.cxx @@ -421,7 +421,9 @@ Launcher_cpp::FactoryBatchManager(ParserResourcesType& params) throw LauncherException("Cannot find batch manager factory"); } LAUNCHER_MESSAGE("Instanciation of batch manager of type: " << bmType); - return (*fact)(hostname.c_str(), protocol, mpi.c_str(), nb_proc_per_node); + Batch::BatchManager_eClient * batch_client = (*fact)(hostname.c_str(), protocol, mpi.c_str(), nb_proc_per_node); + batch_client->setUsername(params.UserName); + return batch_client; } //---------------------------------------------------------- @@ -517,3 +519,87 @@ Launcher_cpp::ParseXmlFile(std::string xmlExecuteFile) return job_params; } +std::map +Launcher_cpp::getJobs() +{ + return _launcher_job_map; +} + +void +Launcher_cpp::checkFactoryForResource(const std::string & resource_name) +{ + // Step 1: Check if resource exist in the resource manager + ParserResourcesType resource_definition; + try + { + resource_definition = _ResManager->GetResourcesDescr(resource_name); + } + catch(const ResourcesException &ex) + { + LAUNCHER_INFOS(ex.msg); + throw LauncherException(ex.msg); + } + + // Step 2: We can now add a Factory is the resource is correctly define + std::map::const_iterator it = _batchmap.find(resource_name); + if(it == _batchmap.end()) + { + try + { + // Warning cannot write on one line like this, because map object is constructed before + // the method is called... + //_batchmap.[resource_name] = FactoryBatchManager(resource_definition); + Batch::BatchManager_eClient * batch_client = FactoryBatchManager(resource_definition); + _batchmap[resource_name] = batch_client; + } + catch(const LauncherException &ex) + { + LAUNCHER_INFOS("Error during creation of the batch manager of the resource, mess: " << ex.msg); + throw ex; + } + catch(const Batch::EmulationException &ex) + { + LAUNCHER_INFOS("Error during creation of the batch manager of the resource, mess: " << ex.message); + throw LauncherException(ex.message); + } + } +} + +void +Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job, const std::string job_reference) +{ + // Step 1: check if resource is already in the map + std::string resource_name = new_job->getResourceDefinition().Name; + checkFactoryForResource(resource_name); + ParserResourcesType resource_definition = _ResManager->GetResourcesDescr(resource_name); + new_job->setResourceDefinition(resource_definition); + + // Step 2: add the job to the batch manager + try + { + Batch::JobId batch_manager_job_id = _batchmap[resource_name]->addJob(*(new_job->getBatchJob()), + job_reference); + new_job->setBatchManagerJobId(batch_manager_job_id); + } + catch(const Batch::EmulationException &ex) + { + LAUNCHER_INFOS("Job is not launched, exception in submitJob: " << ex.message); + throw LauncherException(ex.message.c_str()); + } + + // Step 3: add job to launcher map + pthread_mutex_lock(_job_cpt_mutex); + new_job->setNumber(_job_cpt); + _job_cpt++; + pthread_mutex_unlock(_job_cpt_mutex); + std::map::const_iterator it_job = _launcher_job_map.find(new_job->getNumber()); + if (it_job == _launcher_job_map.end()) + _launcher_job_map[new_job->getNumber()] = new_job; + else + { + LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber()); + delete new_job; + throw LauncherException("A job as already the same id - job is not created !"); + } + LAUNCHER_MESSAGE("New job added"); +} diff --git a/src/Launcher/Launcher.hxx b/src/Launcher/Launcher.hxx index b15881795..eeed0d000 100644 --- a/src/Launcher/Launcher.hxx +++ b/src/Launcher/Launcher.hxx @@ -66,6 +66,9 @@ public: // Useful methods long createJobWithFile(std::string xmlExecuteFile, std::string clusterName); + std::map getJobs(); + void checkFactoryForResource(const std::string & resource_name); + void addJobDirectlyToMap(Launcher::Job * new_job, const std::string job_reference); // Lib methods void SetResourcesManager( ResourcesManager_cpp* rm ) {_ResManager = rm;} diff --git a/src/Launcher/Launcher_Job.cxx b/src/Launcher/Launcher_Job.cxx index 53b3c0e33..55ab16e19 100644 --- a/src/Launcher/Launcher_Job.cxx +++ b/src/Launcher/Launcher_Job.cxx @@ -29,6 +29,7 @@ Launcher::Job::Job() _launch_date = getLaunchDate(); _env_file = ""; + _job_name = ""; _job_file = ""; _job_file_name = ""; _job_file_name_complete = ""; @@ -46,6 +47,7 @@ Launcher::Job::Job() _resource_required_params.cpu_clock = -1; _resource_required_params.mem_mb = -1; _queue = ""; + _job_type = ""; #ifdef WITH_LIBBATCH _batch_job = new Batch::Job(); @@ -72,15 +74,36 @@ Launcher::Job::~Job() #endif } +std::string +Launcher::Job::getJobType() +{ + return _job_type; +} + +void +Launcher::Job::setJobName(const std::string & job_name) +{ + _job_name = job_name; +} + +std::string +Launcher::Job::getJobName() +{ + return _job_name; +} + void Launcher::Job::setState(const std::string & state) { // State of a Job: CREATED, QUEUED, RUNNING, FINISHED, FAILED if (state != "CREATED" && + state != "IN_PROCESS" && state != "QUEUED" && state != "RUNNING" && + state != "PAUSED" && state != "FINISHED" && - state != "FAILED") + state != "FAILED" && + state != "ERROR") { throw LauncherException("Bad state, this state does not exist: " + state); } @@ -368,29 +391,20 @@ Launcher::Job::getLaunchDate() std::string Launcher::Job::updateJobState() { -#ifdef WITH_LIBBATCH - if (_batch_job_id.getReference() != "undefined") + + if (_state != "FINISHED" || + _state != "ERROR" || + _state != "FAILED") { - // A batch manager has been affected to the job - Batch::JobInfo job_info = _batch_job_id.queryJob(); - Batch::Parametre par = job_info.getParametre(); - - LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str()); - - // TODO: Remove this if all tests pass with the new libBatch, otherwise fix the codes in libBatch - // Patch until new LIBBATCH version - // eSSH Client and ePBS Client and eSGE -/* if (par[STATE].str() == "Running" or par[STATE].str() == "E" or par[STATE].str() == "R" or par[STATE].str() == "r" or par[STATE].str() == "RUN") - _state = "RUNNING"; - else if (par[STATE].str() == "Stopped") - _state = "PAUSED"; - else if (par[STATE].str() == "Done" or par[STATE].str() == "U" or par[STATE].str() == "e" or par[STATE].str() == "DONE" or par[STATE].str() == "EXIT") - _state = "FINISHED"; - else if (par[STATE].str() == "Dead" or par[STATE].str() == "Eqw") - _state = "ERROR"; - else if (par[STATE].str() == "Q" or par[STATE].str() == "qw" or par[STATE].str() == "PEN") - _state = "QUEUED";*/ - _state = par[Batch::STATE].str(); +#ifdef WITH_LIBBATCH + if (_batch_job_id.getReference() != "undefined") + { + // A batch manager has been affected to the job + Batch::JobInfo job_info = _batch_job_id.queryJob(); + Batch::Parametre par = job_info.getParametre(); + _state = par[Batch::STATE].str(); + LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str()); + } } #endif return _state; @@ -503,3 +517,66 @@ Launcher::Job::getBatchManagerJobId() return _batch_job_id; } #endif + +void +Launcher::Job::addToXmlDocument(xmlNodePtr root_node) +{ + // Begin job + xmlNodePtr job_node = xmlNewChild(root_node, NULL, xmlCharStrdup("job"), NULL); + xmlNewProp(job_node, xmlCharStrdup("type"), xmlCharStrdup(getJobType().c_str())); + xmlNewProp(job_node, xmlCharStrdup("name"), xmlCharStrdup(getJobName().c_str())); + + // Add user part + xmlNodePtr node = xmlNewChild(job_node, NULL, xmlCharStrdup("user_part"), NULL); + + xmlNewChild(node, NULL, xmlCharStrdup("job_file"), xmlCharStrdup(getJobFile().c_str())); + xmlNewChild(node, NULL, xmlCharStrdup("env_file"), xmlCharStrdup(getEnvFile().c_str())); + xmlNewChild(node, NULL, xmlCharStrdup("work_directory"), xmlCharStrdup(getWorkDirectory().c_str())); + xmlNewChild(node, NULL, xmlCharStrdup("local_directory"), xmlCharStrdup(getLocalDirectory().c_str())); + xmlNewChild(node, NULL, xmlCharStrdup("result_directory"), xmlCharStrdup(getResultDirectory().c_str())); + + // Files + xmlNodePtr files_node = xmlNewChild(node, NULL, xmlCharStrdup("files"), NULL); + std::list in_files = get_in_files(); + std::list out_files = get_out_files(); + for(std::list::iterator it = in_files.begin(); it != in_files.end(); it++) + xmlNewChild(files_node, NULL, xmlCharStrdup("in_file"), xmlCharStrdup((*it).c_str())); + for(std::list::iterator it = out_files.begin(); it != out_files.end(); it++) + xmlNewChild(files_node, NULL, xmlCharStrdup("out_file"), xmlCharStrdup((*it).c_str())); + + // Resource part + resourceParams resource_params = getResourceRequiredParams(); + xmlNodePtr res_node = xmlNewChild(node, NULL, xmlCharStrdup("resource_params"), NULL); + xmlNewChild(res_node, NULL, xmlCharStrdup("name"), xmlCharStrdup(resource_params.name.c_str())); + xmlNewChild(res_node, NULL, xmlCharStrdup("hostname"), xmlCharStrdup(resource_params.hostname.c_str())); + xmlNewChild(res_node, NULL, xmlCharStrdup("OS"), xmlCharStrdup(resource_params.OS.c_str())); + std::ostringstream nb_proc_stream; + std::ostringstream nb_node_stream; + std::ostringstream nb_proc_per_node_stream; + std::ostringstream cpu_clock_stream; + std::ostringstream mem_mb_stream; + nb_proc_stream << resource_params.nb_proc; + nb_node_stream << resource_params.nb_node; + nb_proc_per_node_stream << resource_params.nb_proc_per_node; + cpu_clock_stream << resource_params.cpu_clock; + mem_mb_stream << resource_params.mem_mb; + xmlNewChild(res_node, NULL, xmlCharStrdup("nb_proc"), xmlCharStrdup(nb_proc_stream.str().c_str())); + xmlNewChild(res_node, NULL, xmlCharStrdup("nb_node"), xmlCharStrdup(nb_node_stream.str().c_str())); + xmlNewChild(res_node, NULL, xmlCharStrdup("nb_proc_per_node"), xmlCharStrdup(nb_proc_per_node_stream.str().c_str())); + xmlNewChild(res_node, NULL, xmlCharStrdup("cpu_clock"), xmlCharStrdup(cpu_clock_stream.str().c_str())); + xmlNewChild(res_node, NULL, xmlCharStrdup("mem_mb"), xmlCharStrdup(mem_mb_stream.str().c_str())); + + xmlNewChild(node, NULL, xmlCharStrdup("maximum_duration"), xmlCharStrdup(getMaximumDuration().c_str())); + xmlNewChild(node, NULL, xmlCharStrdup("queue"), xmlCharStrdup(getQueue().c_str())); + + // Run part + xmlNodePtr run_node = xmlNewChild(job_node, NULL, xmlCharStrdup("run_part"), NULL); + xmlNewChild(run_node, NULL, xmlCharStrdup("job_state"), xmlCharStrdup(getState().c_str())); + ParserResourcesType resource_definition = getResourceDefinition(); + xmlNewChild(run_node, NULL, xmlCharStrdup("resource_choosed_name"), xmlCharStrdup(resource_definition.Name.c_str())); + +#ifdef WITH_LIBBATCH + Batch::JobId job_id = getBatchManagerJobId(); + xmlNewChild(run_node, NULL, xmlCharStrdup("job_reference"), xmlCharStrdup(job_id.getReference().c_str())); +#endif +} diff --git a/src/Launcher/Launcher_Job.hxx b/src/Launcher/Launcher_Job.hxx index 4f80bbede..f19e15a64 100644 --- a/src/Launcher/Launcher_Job.hxx +++ b/src/Launcher/Launcher_Job.hxx @@ -44,6 +44,8 @@ #include #endif +#include + namespace Launcher { class LAUNCHER_EXPORT Job @@ -62,8 +64,9 @@ namespace Launcher virtual void setResourceDefinition(const ParserResourcesType & resource_definition); ParserResourcesType getResourceDefinition(); - + // Common parameters + void setJobName(const std::string & job_name); virtual void setJobFile(const std::string & job_file); void setWorkDirectory(const std::string & work_directory); void setLocalDirectory(const std::string & local_directory); @@ -75,6 +78,7 @@ namespace Launcher void setQueue(const std::string & queue); void setEnvFile(const std::string & env_file); + std::string getJobName(); std::string getJobFile(); std::string getWorkDirectory(); std::string getLocalDirectory(); @@ -85,7 +89,8 @@ namespace Launcher resourceParams getResourceRequiredParams(); std::string getQueue(); std::string getEnvFile(); - + std::string getJobType(); + std::string updateJobState(); // Checks @@ -96,18 +101,24 @@ namespace Launcher long convertMaximumDuration(const std::string & maximum_duration); std::string getLaunchDate(); + // Xml method + void addToXmlDocument(xmlNodePtr root_node); + // Abstract class virtual void update_job() = 0; protected: int _number; + std::string _job_type; + std::string _state; std::string _launch_date; std::string _env_file; ParserResourcesType _resource_definition; + std::string _job_name; std::string _job_file; std::string _job_file_name; std::string _job_file_name_complete; diff --git a/src/Launcher/Launcher_Job_Command.cxx b/src/Launcher/Launcher_Job_Command.cxx index 46136964a..9b291ec37 100644 --- a/src/Launcher/Launcher_Job_Command.cxx +++ b/src/Launcher/Launcher_Job_Command.cxx @@ -21,7 +21,7 @@ // #include "Launcher_Job_Command.hxx" -Launcher::Job_Command::Job_Command() {} +Launcher::Job_Command::Job_Command() {_job_type = "command";} Launcher::Job_Command::~Job_Command() {} diff --git a/src/Launcher/Launcher_Job_PythonSALOME.cxx b/src/Launcher/Launcher_Job_PythonSALOME.cxx index f894e8508..2635cf47d 100644 --- a/src/Launcher/Launcher_Job_PythonSALOME.cxx +++ b/src/Launcher/Launcher_Job_PythonSALOME.cxx @@ -22,7 +22,7 @@ #include "Launcher_Job_PythonSALOME.hxx" -Launcher::Job_PythonSALOME::Job_PythonSALOME() {} +Launcher::Job_PythonSALOME::Job_PythonSALOME() {_job_type = "python_salome";} Launcher::Job_PythonSALOME::~Job_PythonSALOME() {} diff --git a/src/Launcher/Launcher_Job_YACSFile.cxx b/src/Launcher/Launcher_Job_YACSFile.cxx index 879f1f748..6b7b44186 100644 --- a/src/Launcher/Launcher_Job_YACSFile.cxx +++ b/src/Launcher/Launcher_Job_YACSFile.cxx @@ -22,7 +22,7 @@ #include "Launcher_Job_YACSFile.hxx" -Launcher::Job_YACSFile::Job_YACSFile() {} +Launcher::Job_YACSFile::Job_YACSFile() {_job_type = "yacs_file";} Launcher::Job_YACSFile::~Job_YACSFile() {} diff --git a/src/Launcher/SALOME_Launcher.cxx b/src/Launcher/SALOME_Launcher.cxx index e64999fb0..5b799f44e 100644 --- a/src/Launcher/SALOME_Launcher.cxx +++ b/src/Launcher/SALOME_Launcher.cxx @@ -39,6 +39,10 @@ #include #include +#include +#include +#include + const char *SALOME_Launcher::_LauncherNameInNS = "/SalomeLauncher"; //============================================================================= @@ -142,9 +146,8 @@ SALOME_Launcher::createJob(const Engines::JobParameters & job_parameters) // Queue std::string queue = job_parameters.queue.in(); - if (queue != "") - new_job->setQueue(queue); - + new_job->setQueue(queue); + // Resources requirements try { @@ -323,3 +326,576 @@ CORBA::Long SALOME_Launcher::getPID() (CORBA::Long)_getpid(); #endif } + +//============================================================================= +/*! CORBA Method: + * Add a new observer to the launcher + */ +//============================================================================= +void +SALOME_Launcher::addObserver(Engines::SalomeLauncherObserver_ptr observer) +{ +} + +//============================================================================= +/*! CORBA Method: + * Returns current launcher jobs list + */ +//============================================================================= +Engines::JobsList * +SALOME_Launcher::getJobsList() +{ + Engines::JobsList_var jobs_list = new Engines::JobsList(); + std::map cpp_jobs = _l.getJobs(); + std::map::const_iterator it_job; + int list_id = 0; + for(it_job = cpp_jobs.begin(); it_job != cpp_jobs.end(); it_job++) + { + int number = it_job->first; + try + { + // Prepare CORBA job description + Engines::JobDescription_var job_descr = new Engines::JobDescription(); + Engines::JobParameters_var job_parameters = getJobParameters(number); + job_descr->job_id = number; + job_descr->job_parameters = job_parameters; + + // Add job description to the sequence + jobs_list->length(list_id + 1); + jobs_list[list_id] = job_descr; + list_id++; + } + catch (...) {} + } + return jobs_list._retn(); +} + +//============================================================================= +/*! CORBA Method: + * Returns the job description + */ +//============================================================================= +Engines::JobParameters * +SALOME_Launcher::getJobParameters(CORBA::Long job_id) +{ + std::map cpp_jobs = _l.getJobs(); + std::map::const_iterator it_job = cpp_jobs.find(job_id); + if (it_job == cpp_jobs.end()) + { + INFOS("Cannot find the job, is it created ? job number: " << job_id); + THROW_SALOME_CORBA_EXCEPTION("Job does not exist", SALOME::INTERNAL_ERROR); + } + + Launcher::Job * job = it_job->second; + Engines::JobParameters_var job_parameters = new Engines::JobParameters; + job_parameters->job_name = CORBA::string_dup(job->getJobName().c_str()); + job_parameters->job_type = CORBA::string_dup(job->getJobType().c_str()); + job_parameters->job_file = CORBA::string_dup(job->getJobFile().c_str()); + job_parameters->env_file = CORBA::string_dup(job->getEnvFile().c_str()); + job_parameters->work_directory = CORBA::string_dup(job->getWorkDirectory().c_str()); + job_parameters->local_directory = CORBA::string_dup(job->getLocalDirectory().c_str()); + job_parameters->result_directory = CORBA::string_dup(job->getResultDirectory().c_str()); + + int i = 0; + int j = 0; + std::list in_files = job->get_in_files(); + std::list out_files = job->get_out_files(); + job_parameters->in_files.length(in_files.size()); + for(std::list::iterator it = in_files.begin(); it != in_files.end(); it++) + { + job_parameters->in_files[i] = CORBA::string_dup((*it).c_str()); + i++; + } + job_parameters->out_files.length(out_files.size()); + for(std::list::iterator it = out_files.begin(); it != out_files.end(); it++) + { + job_parameters->out_files[j] = CORBA::string_dup((*it).c_str()); + j++; + } + + job_parameters->maximum_duration = CORBA::string_dup(job->getMaximumDuration().c_str()); + job_parameters->queue = CORBA::string_dup(job->getQueue().c_str()); + + resourceParams resource_params = job->getResourceRequiredParams(); + job_parameters->resource_required.name = CORBA::string_dup(resource_params.name.c_str()); + job_parameters->resource_required.hostname = CORBA::string_dup(resource_params.hostname.c_str()); + job_parameters->resource_required.OS = CORBA::string_dup(resource_params.OS.c_str()); + job_parameters->resource_required.nb_proc = resource_params.nb_proc; + job_parameters->resource_required.nb_node = resource_params.nb_node; + job_parameters->resource_required.nb_proc_per_node = resource_params.nb_proc_per_node; + job_parameters->resource_required.cpu_clock = resource_params.cpu_clock; + job_parameters->resource_required.mem_mb = resource_params.mem_mb; + + return job_parameters._retn(); +} + +//============================================================================= +/*! CORBA Method: + * Loads jobs saved in jobs_file + */ +//============================================================================= +void +SALOME_Launcher::loadJobs(const char* jobs_file) +{ + // Step 1: check jobs_file read access + FILE* xml_file = fopen(jobs_file, "r"); + if (xml_file == NULL) + { + std::string error = "Error opening jobs_file in SALOME_Launcher::loadJobs: " + std::string(jobs_file); + INFOS(error); + THROW_SALOME_CORBA_EXCEPTION(error.c_str(), SALOME::INTERNAL_ERROR); + } + + // Step 2: read xml file + xmlDocPtr doc = xmlReadFile(jobs_file, NULL, 0); + if (doc == NULL) + { + std::string error = "Error in xmlReadFile in SALOME_Launcher::loadJobs, could not parse file: " + std::string(jobs_file); + INFOS(error); + fclose(xml_file); + THROW_SALOME_CORBA_EXCEPTION(error.c_str(), SALOME::INTERNAL_ERROR); + } + + // Step 3: Find jobs + xmlNodePtr root_node = xmlDocGetRootElement(doc); + xmlNodePtr xmlCurrentNode = root_node->xmlChildrenNode; + if (!xmlStrcmp(root_node->name, xmlCharStrdup("jobs"))) + { + while(xmlCurrentNode != NULL) + { + if (!xmlStrcmp(xmlCurrentNode->name, xmlCharStrdup("job"))) + { + INFOS("A job is found"); + Launcher::Job * new_job; // It is Launcher_cpp that is going to destroy it + xmlNodePtr job_node = xmlCurrentNode; + + // Begin Job + if (!xmlHasProp(job_node, xmlCharStrdup("type")) || + !xmlHasProp(job_node, xmlCharStrdup("name"))) + { + INFOS("A bad job is found, type or name not found"); + break; + } + xmlChar* type = xmlGetProp(job_node, xmlCharStrdup("type")); + xmlChar* name = xmlGetProp(job_node, xmlCharStrdup("name")); + std::string job_type((const char*) type); + if (job_type == "command") + new_job = new Launcher::Job_Command(); + else if (job_type == "yacs_file") + new_job = new Launcher::Job_YACSFile(); + else if (job_type == "python_salome") + new_job = new Launcher::Job_PythonSALOME(); + new_job->setJobName(std::string((const char *)name)); + xmlFree(type); + xmlFree(name); + + xmlNodePtr user_node = xmlFirstElementChild(job_node); + xmlNodePtr run_node = xmlNextElementSibling(user_node); + if (user_node == NULL || run_node == NULL) + { + INFOS("A bad job is found, user_part or run_part not found"); + delete new_job; + break; + } + if (xmlStrcmp(user_node->name, xmlCharStrdup("user_part")) || + xmlStrcmp(run_node->name, xmlCharStrdup("run_part"))) + { + INFOS("A bad job is found, cannot get a correct user_part or run_part node"); + delete new_job; + break; + } + + // Add user part + + // Get job_file env_file work_directory local_directory result_directory + xmlNodePtr job_file_node = xmlFirstElementChild(user_node); + xmlNodePtr env_file_node = xmlNextElementSibling(job_file_node); + xmlNodePtr work_directory_node = xmlNextElementSibling(env_file_node); + xmlNodePtr local_directory_node = xmlNextElementSibling(work_directory_node); + xmlNodePtr result_directory_node = xmlNextElementSibling(local_directory_node); + if (job_file_node == NULL || + env_file_node == NULL || + work_directory_node == NULL || + local_directory_node == NULL || + result_directory_node == NULL + ) + { + INFOS("A bad job is found, some user_part are not found"); + delete new_job; + break; + } + if (xmlStrcmp(job_file_node->name, xmlCharStrdup("job_file")) || + xmlStrcmp(env_file_node->name, xmlCharStrdup("env_file")) || + xmlStrcmp(work_directory_node->name, xmlCharStrdup("work_directory")) || + xmlStrcmp(local_directory_node->name, xmlCharStrdup("local_directory")) || + xmlStrcmp(result_directory_node->name, xmlCharStrdup("result_directory")) + ) + { + INFOS("A bad job is found, some user part node are not in the rigth or does not have a correct name"); + delete new_job; + break; + } + xmlChar* job_file = xmlNodeGetContent(job_file_node); + try + { + new_job->setJobFile(std::string((const char *)job_file)); + } + catch(const LauncherException &ex) + { + INFOS("Exception receice for job_file, cannot add the job" << ex.msg.c_str()); + delete new_job; + xmlFree(job_file); + break; + } + xmlChar* env_file = xmlNodeGetContent(env_file_node); + xmlChar* work_directory = xmlNodeGetContent(work_directory_node); + xmlChar* local_directory = xmlNodeGetContent(local_directory_node); + xmlChar* result_directory = xmlNodeGetContent(result_directory_node); + new_job->setEnvFile(std::string((const char *)env_file)); + new_job->setWorkDirectory(std::string((const char *)work_directory)); + new_job->setLocalDirectory(std::string((const char *)local_directory)); + new_job->setResultDirectory(std::string((const char *)result_directory)); + xmlFree(job_file); + xmlFree(env_file); + xmlFree(work_directory); + xmlFree(local_directory); + xmlFree(result_directory); + + // Get in and out files + xmlNodePtr files_node = xmlNextElementSibling(result_directory_node); + if (files_node == NULL) + { + INFOS("A bad job is found, user_part files is not found"); + delete new_job; + break; + } + if (xmlStrcmp(files_node->name, xmlCharStrdup("files"))) + { + INFOS("A bad job is found, files node are not in the rigth place or does not have a correct name or does not exist"); + delete new_job; + break; + } + xmlNodePtr file_node = xmlFirstElementChild(files_node); + while (file_node != NULL) + { + if (!xmlStrcmp(file_node->name, xmlCharStrdup("in_file"))) + { + xmlChar* in_file = xmlNodeGetContent(file_node); + new_job->add_in_file(std::string((const char *)in_file)); + xmlFree(in_file); + } + else if (!xmlStrcmp(file_node->name, xmlCharStrdup("out_file"))) + { + xmlChar* out_file = xmlNodeGetContent(file_node); + new_job->add_out_file(std::string((const char *)out_file)); + xmlFree(out_file); + } + file_node = xmlNextElementSibling(file_node); + } + + // Get resource part + xmlNodePtr res_node = xmlNextElementSibling(files_node); + xmlNodePtr maximum_duration_node = xmlNextElementSibling(res_node); + xmlNodePtr queue_node = xmlNextElementSibling(maximum_duration_node); + if (res_node == NULL || + maximum_duration_node == NULL || + queue_node == NULL + ) + { + INFOS("A bad job is found, some user_part are not found"); + delete new_job; + break; + } + if (xmlStrcmp(res_node->name, xmlCharStrdup("resource_params")) || + xmlStrcmp(maximum_duration_node->name, xmlCharStrdup("maximum_duration")) || + xmlStrcmp(queue_node->name, xmlCharStrdup("queue")) + ) + { + INFOS("A bad job is found, some user part node are not in the rigth or does not have a correct name"); + delete new_job; + break; + } + xmlChar* maximum_duration = xmlNodeGetContent(maximum_duration_node); + try + { + new_job->setMaximumDuration(std::string((const char *)maximum_duration)); + } + catch(const LauncherException &ex) + { + INFOS("Exception receice for maximum_duration, cannot add the job" << ex.msg.c_str()); + delete new_job; + xmlFree(maximum_duration); + break; + } + xmlChar* queue = xmlNodeGetContent(queue_node); + new_job->setQueue(std::string((const char *)queue)); + xmlFree(maximum_duration); + xmlFree(queue); + + xmlNodePtr res_name_node = xmlFirstElementChild(res_node); + xmlNodePtr res_hostname_node = xmlNextElementSibling(res_name_node); + xmlNodePtr res_os_node = xmlNextElementSibling(res_hostname_node); + xmlNodePtr res_nb_proc_node = xmlNextElementSibling(res_os_node); + xmlNodePtr res_nb_node_node = xmlNextElementSibling(res_nb_proc_node); + xmlNodePtr res_nb_proc_per_node_node = xmlNextElementSibling(res_nb_node_node); + xmlNodePtr res_cpu_clock_node = xmlNextElementSibling(res_nb_proc_per_node_node); + xmlNodePtr res_mem_mb_node = xmlNextElementSibling(res_cpu_clock_node); + if (res_name_node == NULL || + res_hostname_node == NULL || + res_os_node == NULL || + res_nb_proc_node == NULL || + res_nb_node_node == NULL || + res_nb_proc_per_node_node == NULL || + res_cpu_clock_node == NULL || + res_mem_mb_node == NULL + ) + { + INFOS("A bad job is found, some resource_params user_part are not found"); + delete new_job; + break; + } + if (xmlStrcmp(res_name_node->name, xmlCharStrdup("name")) || + xmlStrcmp(res_hostname_node->name, xmlCharStrdup("hostname")) || + xmlStrcmp(res_os_node->name, xmlCharStrdup("OS")) || + xmlStrcmp(res_nb_proc_node->name, xmlCharStrdup("nb_proc")) || + xmlStrcmp(res_nb_node_node->name, xmlCharStrdup("nb_node")) || + xmlStrcmp(res_nb_proc_per_node_node->name, xmlCharStrdup("nb_proc_per_node")) || + xmlStrcmp(res_cpu_clock_node->name, xmlCharStrdup("cpu_clock")) || + xmlStrcmp(res_mem_mb_node->name, xmlCharStrdup("mem_mb")) + ) + { + INFOS("A bad job is found, some resource_params user_part node are not in the rigth or does not have a correct name"); + delete new_job; + break; + } + xmlChar* res_name = xmlNodeGetContent(res_name_node); + xmlChar* res_hostname = xmlNodeGetContent(res_hostname_node); + xmlChar* res_os = xmlNodeGetContent(res_os_node); + resourceParams p; + p.name = std::string((const char*) res_name); + p.hostname = std::string((const char*) res_hostname); + p.OS = std::string((const char*) res_os); + xmlFree(res_name); + xmlFree(res_hostname); + xmlFree(res_os); + xmlChar* res_nb_proc = xmlNodeGetContent(res_nb_proc_node); + xmlChar* res_nb_node = xmlNodeGetContent(res_nb_node_node); + xmlChar* res_nb_proc_per_node = xmlNodeGetContent(res_nb_proc_per_node_node); + xmlChar* res_cpu_clock = xmlNodeGetContent(res_cpu_clock_node); + xmlChar* res_mem_mb = xmlNodeGetContent(res_mem_mb_node); + bool import_value = true; + std::istringstream nb_proc_stream((const char *) res_nb_proc); + if (!(nb_proc_stream >> p.nb_proc)) + import_value = false; + std::istringstream nb_node_stream((const char *) res_nb_node); + if (!(nb_node_stream >> p.nb_node)) + import_value = false; + std::istringstream nb_proc_per_node_stream((const char *) res_nb_proc_per_node); + if (!(nb_proc_per_node_stream >> p.nb_proc_per_node)) + import_value = false; + std::istringstream cpu_clock_stream((const char *) res_cpu_clock); + if (!(cpu_clock_stream >> p.cpu_clock)) + import_value = false; + std::istringstream mem_mb_stream((const char *) res_mem_mb); + if (!(mem_mb_stream >> p.mem_mb)) + import_value = false; + xmlFree(res_nb_proc); + xmlFree(res_nb_node); + xmlFree(res_nb_proc_per_node); + xmlFree(res_cpu_clock); + xmlFree(res_mem_mb); + if (!import_value) + { + INFOS("A bad job is found, some resource_params value are not correct"); + delete new_job; + break; + } + try + { + new_job->setResourceRequiredParams(p); + } + catch(const LauncherException &ex) + { + INFOS("A bad job is found, an error when inserting resource_params:" << ex.msg.c_str()); + delete new_job; + break; + } + + // We finally get run part to figure out what to do + xmlNodePtr job_state_node = xmlFirstElementChild(run_node); + xmlNodePtr resource_choosed_name_node = xmlNextElementSibling(job_state_node); + xmlNodePtr job_reference_node = xmlNextElementSibling(resource_choosed_name_node); + if (job_state_node == NULL || + resource_choosed_name_node == NULL || + job_reference_node == NULL + ) + { + INFOS("A bad job is found, some run_part are not found"); + delete new_job; + break; + } + if (xmlStrcmp(job_state_node->name, xmlCharStrdup("job_state")) || + xmlStrcmp(resource_choosed_name_node->name, xmlCharStrdup("resource_choosed_name")) || + xmlStrcmp(job_reference_node->name, xmlCharStrdup("job_reference")) + ) + { + INFOS("A bad job is found, some run_part nodes are not in the rigth or does not have a correct name"); + delete new_job; + break; + } + xmlChar* job_state_xml = xmlNodeGetContent(job_state_node); + xmlChar* resource_choosed_name_xml = xmlNodeGetContent(resource_choosed_name_node); + xmlChar* job_reference_xml = xmlNodeGetContent(job_reference_node); + std::string job_state((const char *) job_state_xml); + std::string resource_choosed_name((const char *) resource_choosed_name_xml); + std::string job_reference((const char *) job_reference_xml); + xmlFree(job_state_xml); + xmlFree(resource_choosed_name_xml); + xmlFree(job_reference_xml); + + // TODO: EVENT for observer ! + if (job_state == "CREATED") + { + // In this case, we ignore run_part informations + try + { + _l.createJob(new_job); + } + catch(const LauncherException &ex) + { + INFOS("Load failed: " << ex.msg.c_str()); + } + } + else if (job_state == "QUEUED" || + job_state == "RUNNING" || + job_state == "IN_PROCESS" || + job_state == "PAUSED") + { + try + { + // Step 1: Add the resource to the launcher C++ map + _l.checkFactoryForResource(resource_choosed_name); + + // Step 2: We add run_part informations + new_job->setState(job_state); + + // Step 3: We add the job to the launcher + ParserResourcesType resource_definition; + resource_definition.Name = resource_choosed_name; + new_job->setResourceDefinition(resource_definition); + _l.addJobDirectlyToMap(new_job, job_reference); + + // Step 4: We check that the BatchManager could resume + // the job + if (new_job->getBatchManagerJobId().getReference() != job_reference) + { + INFOS("BatchManager type cannot resume a job - job state is set to ERROR"); + new_job->setState("ERROR"); + } + } + catch(const LauncherException &ex) + { + INFOS("Cannot load the job! Exception: " << ex.msg.c_str()); + delete new_job; + } + } + else if (job_state == "FINISHED" || + job_state == "FAILED" || + job_state == "ERROR") + { + try + { + // Step 1: Add the resource to the launcher C++ map + _l.checkFactoryForResource(resource_choosed_name); + + // Step 2: We add run_part informations + new_job->setState(job_state); + + // Step 3: We add the job to the launcher + ParserResourcesType resource_definition; + resource_definition.Name = resource_choosed_name; + new_job->setResourceDefinition(resource_definition); + _l.addJobDirectlyToMap(new_job, job_reference); + } + catch(const LauncherException &ex) + { + INFOS("Cannot load the job! Exception: " << ex.msg.c_str()); + delete new_job; + } + } + else + { + INFOS("A bad job is found, state unknown " << job_state); + delete new_job; + } + + } + xmlCurrentNode = xmlCurrentNode->next; + } + } + else + { + xmlFreeDoc(doc); + fclose(xml_file); + std::string error = "Error in xml file, could not find root_node named jobs: " + std::string(jobs_file); + INFOS(error); + THROW_SALOME_CORBA_EXCEPTION(error.c_str(), SALOME::INTERNAL_ERROR); + } + + // Clean + xmlFreeDoc(doc); + fclose(xml_file); +} + +//============================================================================= +/*! CORBA Method: + * Save jobs of Launcher (in any steps) in file jobs_file + */ +//============================================================================= +void +SALOME_Launcher::saveJobs(const char* jobs_file) +{ + + // Step 1: check jobs_file write access + FILE* xml_file = fopen(jobs_file, "w"); + if (xml_file == NULL) + { + std::string error = "Error opening jobs_file in SALOME_Launcher::saveJobs: " + std::string(jobs_file); + INFOS(error); + THROW_SALOME_CORBA_EXCEPTION(error.c_str(), SALOME::INTERNAL_ERROR); + } + + // Step 2: First lines + xmlKeepBlanksDefault(0); + xmlDocPtr doc = xmlNewDoc(xmlCharStrdup("1.0")); + xmlNodePtr root_node = xmlNewNode(NULL, xmlCharStrdup("jobs")); + xmlDocSetRootElement(doc, root_node); + xmlNodePtr doc_comment = xmlNewDocComment(doc, xmlCharStrdup("SALOME Launcher save jobs file")); + xmlAddPrevSibling(root_node, doc_comment); + + // Step 3: For each job write it on the xml document + // We could put a mutex but are not foing to do that currently + std::map jobs_list = _l.getJobs(); + std::map::const_iterator it_job; + for(it_job = jobs_list.begin(); it_job != jobs_list.end(); it_job++) + { + it_job->second->addToXmlDocument(root_node); + } + + // Final step: write file + int isOk = xmlSaveFormatFile(jobs_file, doc, 1); + if (!isOk) + { + std::string error = "Error during xml file saving in SALOME_Launcher::saveJobs: " + std::string(jobs_file); + INFOS(error); + xmlFreeDoc(doc); + fclose(xml_file); + THROW_SALOME_CORBA_EXCEPTION(error.c_str(), SALOME::INTERNAL_ERROR); + } + + // Clean + xmlFreeDoc(doc); + fclose(xml_file); + MESSAGE("SALOME_Launcher::saveJobs : WRITING DONE!"); + //TODO: Generation evenement pour les observeurs +} diff --git a/src/Launcher/SALOME_Launcher.hxx b/src/Launcher/SALOME_Launcher.hxx index 46c0d566c..98bff0f66 100644 --- a/src/Launcher/SALOME_Launcher.hxx +++ b/src/Launcher/SALOME_Launcher.hxx @@ -59,6 +59,16 @@ public: void Shutdown(); CORBA::Long getPID(); + // Observer and introspection methods + virtual void addObserver(Engines::SalomeLauncherObserver_ptr observer); + virtual Engines::JobsList * getJobsList(); + virtual Engines::JobParameters * getJobParameters(CORBA::Long job_id); + + // Save and load methods + virtual void loadJobs(const char* jobs_file); + virtual void saveJobs(const char* jobs_file); + + static const char *_LauncherNameInNS; protected: diff --git a/src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py b/src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py index 417cf4e16..57bcebff5 100644 --- a/src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py +++ b/src/LifeCycleCORBA_SWIG/LifeCycleCORBA.py @@ -57,9 +57,9 @@ class ResourceParameters (Engines.ResourceParameters): policy, resList) class JobParameters (Engines.JobParameters): - def __init__(self, job_type="", job_file="", env_file="", in_files=[], out_files=[], + def __init__(self, job_name="", job_type="", job_file="", env_file="", in_files=[], out_files=[], work_directory="", local_directory="", result_directory="", maximum_duration="", resource_required=None, queue="", specific_parameters=[]): - Engines.JobParameters.__init__(self, job_type, job_file, env_file, in_files, out_files, + Engines.JobParameters.__init__(self, job_name, job_type, job_file, env_file, in_files, out_files, work_directory, local_directory, result_directory, maximum_duration, resource_required, queue, specific_parameters)