1 // Copyright (C) 2007-2016 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
30 #include <libbatch/BatchManagerCatalog.hxx>
31 #include <libbatch/FactBatchManager.hxx>
32 #include <libbatch/BatchManager.hxx>
35 #include "Basics_Utils.hxx"
36 #include "Basics_DirUtils.hxx"
37 #include "SALOME_Launcher_Handler.hxx"
38 #include "Launcher.hxx"
39 #include "Launcher_Job_Command.hxx"
40 #include "Launcher_XML_Persistence.hxx"
44 //=============================================================================
48 * Define a CORBA single thread policy for the server, which avoid to deal
49 * with non thread-safe usage like Change_Directory in SALOME naming service
51 //=============================================================================
52 Launcher_cpp::Launcher_cpp()
54 LAUNCHER_MESSAGE("Launcher_cpp constructor");
56 _job_cpt_mutex = new pthread_mutex_t();
57 pthread_mutex_init(_job_cpt_mutex, NULL);
60 //=============================================================================
64 //=============================================================================
65 Launcher_cpp::~Launcher_cpp()
67 LAUNCHER_MESSAGE("Launcher_cpp destructor");
69 std::map<int, Launcher::Job *>::const_iterator it_job;
70 for(it_job = _launcher_job_map.begin(); it_job != _launcher_job_map.end(); it_job++)
71 delete it_job->second;
72 std::map <int, Batch::BatchManager * >::const_iterator it1;
73 for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
77 pthread_mutex_destroy(_job_cpt_mutex);
78 delete _job_cpt_mutex;
83 //=============================================================================
85 * Add a job into the launcher - check resource and choose one
87 //=============================================================================
89 Launcher_cpp::createJob(Launcher::Job * new_job)
91 LAUNCHER_MESSAGE("Creating a new job");
92 // Add job to the jobs map
93 pthread_mutex_lock(_job_cpt_mutex);
94 new_job->setNumber(_job_cpt);
96 pthread_mutex_unlock(_job_cpt_mutex);
97 std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
98 if (it_job == _launcher_job_map.end())
100 _launcher_job_map[new_job->getNumber()] = new_job;
104 LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
106 throw LauncherException("A job as already the same id - job is not created !");
108 LAUNCHER_MESSAGE("New Job created");
111 //=============================================================================
115 //=============================================================================
117 Launcher_cpp::launchJob(int job_id)
119 LAUNCHER_MESSAGE("Launch a job");
121 // Check if job exist
122 Launcher::Job * job = findJob(job_id);
124 // Check job state (cannot launch a job already launched...)
125 if (job->getState() != "CREATED")
127 LAUNCHER_INFOS("Bad state of the job: " << job->getState());
128 throw LauncherException("Bad state of the job: " + job->getState());
131 // Third step search batch manager for the job into the map -> instantiate one if does not exist
132 std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
133 if(it == _batchmap.end())
135 createBatchManagerForJob(job);
139 Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob()));
140 job->setBatchManagerJobId(batch_manager_job_id);
141 job->setState("QUEUED");
142 job->setReference(batch_manager_job_id.getReference());
144 catch(const Batch::GenericException &ex)
146 LAUNCHER_INFOS("Job is not launched, exception in submitJob: " << ex.message);
147 throw LauncherException(ex.message.c_str());
149 LAUNCHER_MESSAGE("Job launched");
152 //=============================================================================
156 //=============================================================================
158 Launcher_cpp::getJobState(int job_id)
160 LAUNCHER_MESSAGE("Get job state");
162 // Check if job exist
163 Launcher::Job * job = findJob(job_id);
168 state = job->updateJobState();
170 catch(const Batch::GenericException &ex)
172 LAUNCHER_INFOS("getJobState failed, exception: " << ex.message);
173 throw LauncherException(ex.message.c_str());
176 return state.c_str();
179 //=============================================================================
181 * Get job assigned hostnames
183 //=============================================================================
185 Launcher_cpp::getAssignedHostnames(int job_id)
187 LAUNCHER_MESSAGE("Get job assigned hostnames");
189 // Check if job exist
190 Launcher::Job * job = findJob(job_id);
191 std::string assigned_hostnames = job->getAssignedHostnames();
193 return assigned_hostnames.c_str();
196 //=============================================================================
198 * Get Job result - the result directory could be changed
200 //=============================================================================
202 Launcher_cpp::getJobResults(int job_id, std::string directory)
204 LAUNCHER_MESSAGE("Get Job results");
206 Launcher::Job * job = findJob(job_id);
207 std::string resource_name = job->getResourceDefinition().Name;
211 _batchmap[job_id]->importOutputFiles(*(job->getBatchJob()), directory);
213 _batchmap[job_id]->importOutputFiles(*(job->getBatchJob()), job->getResultDirectory());
215 catch(const Batch::GenericException &ex)
217 LAUNCHER_INFOS("getJobResult is maybe incomplete, exception: " << ex.message);
218 throw LauncherException(ex.message.c_str());
220 LAUNCHER_MESSAGE("getJobResult ended");
223 //=============================================================================
225 * Clear the remote working directory
227 //=============================================================================
229 Launcher_cpp::clearJobWorkingDir(int job_id)
231 LAUNCHER_MESSAGE("Clear the remote working directory");
233 Launcher::Job * job = findJob(job_id);
236 _batchmap[job_id]->clearWorkingDir(*(job->getBatchJob()));
238 catch(const Batch::GenericException &ex)
240 LAUNCHER_INFOS("getJobResult is maybe incomplete, exception: " << ex.message);
241 throw LauncherException(ex.message.c_str());
243 LAUNCHER_MESSAGE("getJobResult ended");
246 //=============================================================================
248 * Get Job dump state - the result directory could be changed
250 //=============================================================================
252 Launcher_cpp::getJobDumpState(int job_id, std::string directory)
255 LAUNCHER_MESSAGE("Get Job dump state");
257 Launcher::Job * job = findJob(job_id);
258 std::string resource_name = job->getResourceDefinition().Name;
262 rtn = _batchmap[job_id]->importDumpStateFile(*(job->getBatchJob()), directory);
264 rtn = _batchmap[job_id]->importDumpStateFile(*(job->getBatchJob()), job->getResultDirectory());
266 catch(const Batch::GenericException &ex)
268 LAUNCHER_INFOS("getJobResult is maybe incomplete, exception: " << ex.message);
269 throw LauncherException(ex.message.c_str());
271 LAUNCHER_MESSAGE("getJobResult ended");
275 //=============================================================================
277 * Get one file from the working directory - the result directory can be changed
279 //=============================================================================
281 Launcher_cpp::getJobWorkFile(int job_id,
282 std::string work_file,
283 std::string directory)
286 LAUNCHER_MESSAGE("Get working file " << work_file);
288 Launcher::Job * job = findJob(job_id);
289 std::string resource_name = job->getResourceDefinition().Name;
293 rtn = _batchmap[job_id]->importWorkFile(*(job->getBatchJob()), work_file, directory);
295 rtn = _batchmap[job_id]->importWorkFile(*(job->getBatchJob()), work_file, job->getResultDirectory());
297 catch(const Batch::GenericException &ex)
299 LAUNCHER_INFOS("getJobWorkFile is maybe incomplete, exception: " << ex.message);
300 throw LauncherException(ex.message.c_str());
302 LAUNCHER_MESSAGE("getJobWorkFile ended");
306 //=============================================================================
308 * Remove the job - into the Launcher and its batch manager
310 //=============================================================================
312 Launcher_cpp::removeJob(int job_id)
314 LAUNCHER_MESSAGE("Remove Job");
316 // Check if job exist
317 std::map<int, Launcher::Job *>::iterator it_job = _launcher_job_map.find(job_id);
318 if (it_job == _launcher_job_map.end())
320 LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
321 throw LauncherException("Cannot find the job, is it created ?");
324 it_job->second->removeJob();
325 delete it_job->second;
326 _launcher_job_map.erase(it_job);
329 //=============================================================================
333 //=============================================================================
335 Launcher_cpp::stopJob(int job_id)
337 LAUNCHER_MESSAGE("Stop Job");
339 Launcher::Job * job = findJob(job_id);
344 Launcher_cpp::dumpJob(int job_id)
346 LAUNCHER_MESSAGE("dump Job");
348 Launcher::Job * job = findJob(job_id);
349 return Launcher::XML_Persistence::dumpJob(*job);
353 Launcher_cpp::restoreJob(const std::string& dumpedJob)
355 LAUNCHER_MESSAGE("restore Job");
356 Launcher::Job * new_job=NULL;
360 new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob);
363 jobId = addJob(new_job);
368 catch(const LauncherException &ex)
370 LAUNCHER_INFOS("Cannot load the job. Exception: " << ex.msg.c_str());
377 //=============================================================================
379 * create a launcher job based on a file
380 * \param xmlExecuteFile : to define the execution on the batch cluster
382 //=============================================================================
384 Launcher_cpp::createJobWithFile(const std::string xmlExecuteFile,
385 const std::string clusterName)
387 LAUNCHER_MESSAGE("Begin of Launcher_cpp::createJobWithFile");
390 ParserLauncherType job_params = ParseXmlFile(xmlExecuteFile);
392 // Creating a new job
393 Launcher::Job_Command * new_job = new Launcher::Job_Command();
395 std::string cmdFile = Kernel_Utils::GetTmpFileName();
402 os.open(cmdFile.c_str(), std::ofstream::out );
403 os << "#! /bin/sh" << std::endl;
404 os << job_params.Command;
407 new_job->setJobFile(cmdFile);
408 new_job->setLocalDirectory(job_params.RefDirectory);
409 new_job->setWorkDirectory(job_params.MachinesList[clusterName].WorkDirectory);
410 new_job->setEnvFile(job_params.MachinesList[clusterName].EnvFile);
412 for(int i=0; i < job_params.InputFile.size(); i++)
413 new_job->add_in_file(job_params.InputFile[i]);
414 for(int i=0; i < job_params.OutputFile.size();i++)
415 new_job->add_out_file(job_params.OutputFile[i]);
418 p.hostname = clusterName;
421 p.nb_proc = job_params.NbOfProcesses;
423 p.nb_proc_per_node = 0;
426 new_job->setResourceRequiredParams(p);
429 return new_job->getNumber();
432 //=============================================================================
434 * Factory to instantiate the good batch manager for chosen cluster.
436 //=============================================================================
437 Batch::BatchManager *
438 Launcher_cpp::FactoryBatchManager(ParserResourcesType& params)
441 Batch::CommunicationProtocolType protocol;
442 Batch::FactBatchManager * fact;
444 std::string hostname = params.HostName;
446 switch(params.Protocol)
449 protocol = Batch::SH;
452 protocol = Batch::RSH;
455 protocol = Batch::SSH;
458 protocol = Batch::RSYNC;
461 throw LauncherException("Unknown protocol for this resource");
493 switch( params.Batch )
526 LAUNCHER_MESSAGE("Bad batch description of the resource: Batch = " << params.Batch);
527 throw LauncherException("No batchmanager for that cluster - Bad batch description of the resource");
529 Batch::BatchManagerCatalog & cata = Batch::BatchManagerCatalog::getInstance();
530 fact = dynamic_cast<Batch::FactBatchManager*>(cata(bmType));
532 LAUNCHER_MESSAGE("Cannot find batch manager factory for " << bmType << ". Check your version of libBatch.");
533 throw LauncherException("Cannot find batch manager factory");
535 LAUNCHER_MESSAGE("Instantiation of batch manager of type: " << bmType);
536 Batch::BatchManager * batch_client = (*fact)(hostname.c_str(), params.UserName.c_str(),
537 protocol, mpi.c_str());
541 //----------------------------------------------------------
542 // Without LIBBATCH - Launcher_cpp do nothing...
543 //----------------------------------------------------------
547 Launcher_cpp::createJob(Launcher::Job * new_job)
549 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot create a job !!!");
551 throw LauncherException("Method Launcher_cpp::createJob is not available "
552 "(libBatch was not present at compilation time)");
556 Launcher_cpp::launchJob(int job_id)
558 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot launch a job !!!");
559 throw LauncherException("Method Launcher_cpp::launchJob is not available "
560 "(libBatch was not present at compilation time)");
564 Launcher_cpp::getJobState(int job_id)
566 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job state!!!");
567 throw LauncherException("Method Launcher_cpp::getJobState is not available "
568 "(libBatch was not present at compilation time)");
572 Launcher_cpp::getAssignedHostnames(int job_id)
574 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job assigned hostnames!!!");
575 throw LauncherException("Method Launcher_cpp::getAssignedHostnames is not available "
576 "(libBatch was not present at compilation time)");
580 Launcher_cpp::getJobResults(int job_id, std::string directory)
582 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job results!!!");
583 throw LauncherException("Method Launcher_cpp::getJobResults is not available "
584 "(libBatch was not present at compilation time)");
588 Launcher_cpp::clearJobWorkingDir(int job_id)
590 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot clear directory!!!");
591 throw LauncherException("Method Launcher_cpp::clearJobWorkingDir is not available "
592 "(libBatch was not present at compilation time)");
596 Launcher_cpp::getJobDumpState(int job_id, std::string directory)
598 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job dump state!!!");
599 throw LauncherException("Method Launcher_cpp::getJobDumpState is not available "
600 "(libBatch was not present at compilation time)");
604 Launcher_cpp::getJobWorkFile(int job_id, std::string work_file, std::string directory)
606 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot get job dump state!!!");
607 throw LauncherException("Method Launcher_cpp::getJobWorkFile is not available "
608 "(libBatch was not present at compilation time)");
612 Launcher_cpp::removeJob(int job_id)
614 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot remove job!!!");
615 throw LauncherException("Method Launcher_cpp::removeJob is not available "
616 "(libBatch was not present at compilation time)");
620 Launcher_cpp::stopJob(int job_id)
622 throw LauncherException("Method Launcher_cpp::stopJob is not available "
623 "(libBatch was not present at compilation time)");
627 Launcher_cpp::dumpJob(int job_id)
629 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot dump job!!!");
630 throw LauncherException("Method Launcher_cpp::dumpJob is not available "
631 "(libBatch was not present at compilation time)");
636 Launcher_cpp::restoreJob(const std::string& dumpedJob)
638 LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot restore job!!!");
639 throw LauncherException("Method Launcher_cpp::restoreJob is not available "
640 "(libBatch was not present at compilation time)");
645 Launcher_cpp::createJobWithFile( const std::string xmlExecuteFile, std::string clusterName)
647 throw LauncherException("Method Launcher_cpp::createJobWithFile is not available "
648 "(libBatch was not present at compilation time)");
655 Launcher_cpp::ParseXmlFile(std::string xmlExecuteFile)
657 ParserLauncherType job_params;
658 SALOME_Launcher_Handler * handler = new SALOME_Launcher_Handler(job_params);
660 const char* aFilePath = xmlExecuteFile.c_str();
661 FILE* aFile = fopen(aFilePath, "r");
664 xmlDocPtr aDoc = xmlReadFile(aFilePath, NULL, 0);
666 handler->ProcessXmlDocument(aDoc);
669 std::string message = "ResourcesManager_cpp: could not parse file: " + xmlExecuteFile;
670 LAUNCHER_MESSAGE(message);
672 throw LauncherException(message);
680 std::string message = "ResourcesManager_cpp: file is not readable: " + xmlExecuteFile;
681 LAUNCHER_MESSAGE(message);
683 throw LauncherException(message);
691 std::map<int, Launcher::Job *>
692 Launcher_cpp::getJobs()
694 return _launcher_job_map;
698 Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
700 int job_id = job->getNumber();
702 // Select a resource for the job
703 std::vector<std::string> ResourceList;
704 resourceParams params = job->getResourceRequiredParams();
705 // Consider only resources that can launch batch jobs
706 params.can_launch_batch_jobs = true;
709 ResourceList = _ResManager->GetFittingResources(params);
711 catch(const ResourcesException &ex)
713 throw LauncherException(ex.msg.c_str());
715 if (ResourceList.size() == 0)
717 LAUNCHER_INFOS("No adequate resource found for the job, number " << job->getNumber());
718 job->setState("ERROR");
719 throw LauncherException("No resource found the job");
722 // Configure the job with the resource selected - the first of the list
723 ParserResourcesType resource_definition = _ResManager->GetResourcesDescr(ResourceList[0]);
725 // Set resource definition to the job
726 // The job will check if the definitions needed
729 job->setResourceDefinition(resource_definition);
731 catch(const LauncherException &ex)
733 LAUNCHER_INFOS("Error in the definition of the resource, mess: " << ex.msg);
734 job->setState("ERROR");
738 // Step 2: We can now add a Factory if the resource is correctly define
740 std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
741 if(it == _batchmap.end())
745 // Warning cannot write on one line like this, because map object is constructed before
746 // the method is called...
747 //_batchmap[job_id] = FactoryBatchManager(resource_definition);
748 Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition);
749 _batchmap[job_id] = batch_client;
751 catch(const LauncherException &ex)
753 LAUNCHER_INFOS("Error during creation of the batch manager of the job, mess: " << ex.msg);
756 catch(const Batch::GenericException &ex)
758 LAUNCHER_INFOS("Error during creation of the batch manager of the job, mess: " << ex.message);
759 throw LauncherException(ex.message);
766 Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job)
768 // Step 0: Calculated job_id
769 pthread_mutex_lock(_job_cpt_mutex);
770 int job_id = _job_cpt;
772 new_job->setNumber(job_id);
773 pthread_mutex_unlock(_job_cpt_mutex);
775 // Step 1: check if resource is already in the map
776 createBatchManagerForJob(new_job);
778 // Step 2: add the job to the batch manager
782 Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()),
783 new_job->getReference());
784 new_job->setBatchManagerJobId(batch_manager_job_id);
786 catch(const Batch::GenericException &ex)
788 LAUNCHER_INFOS("Job cannot be added, exception in addJob: " << ex.message);
789 throw LauncherException(ex.message.c_str());
792 // Step 3: add job to launcher map
793 std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
794 if (it_job == _launcher_job_map.end())
796 _launcher_job_map[new_job->getNumber()] = new_job;
800 LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
802 throw LauncherException("A job as already the same id - job is not created !");
804 LAUNCHER_MESSAGE("New job added");
809 Launcher_cpp::addJob(Launcher::Job * new_job)
811 string job_state = new_job->getState();
813 if (job_state == "CREATED")
815 // In this case, we ignore run_part information
817 jobId = new_job->getNumber();
819 else if (job_state == "QUEUED" ||
820 job_state == "RUNNING" ||
821 job_state == "IN_PROCESS" ||
822 job_state == "PAUSED")
824 addJobDirectlyToMap(new_job);
825 jobId = new_job->getNumber();
827 // We check that the BatchManager could resume the job
829 if (new_job->getBatchManagerJobId().getReference() != new_job->getReference())
831 LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR");
832 new_job->setState("ERROR");
836 else if (job_state == "FINISHED" ||
837 job_state == "FAILED" ||
838 job_state == "ERROR")
840 // We add run_part information
841 addJobDirectlyToMap(new_job);
842 jobId = new_job->getNumber();
846 LAUNCHER_INFOS("A bad job is found, state unknown " << job_state);
853 Launcher_cpp::loadJobs(const char* jobs_file)
855 list<int> new_jobs_id_list;
857 // Load the jobs from XML file
858 list<Launcher::Job *> jobs_list = Launcher::XML_Persistence::loadJobs(jobs_file);
860 // Create each job in the launcher
861 list<Launcher::Job *>::const_iterator it_job;
862 for (it_job = jobs_list.begin(); it_job != jobs_list.end(); it_job++)
864 Launcher::Job * new_job = *it_job;
868 jobId = addJob(new_job);
870 new_jobs_id_list.push_back(jobId);
874 catch(const LauncherException &ex)
876 LAUNCHER_INFOS("Cannot load the job. Exception: " << ex.msg.c_str());
881 return new_jobs_id_list;
885 Launcher_cpp::saveJobs(const char* jobs_file)
887 // Create a sorted list from the internal job map
888 list<const Launcher::Job *> jobs_list;
889 for (int i=0; i<_job_cpt; i++)
891 map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
892 if (it_job != _launcher_job_map.end())
893 jobs_list.push_back(it_job->second);
896 // Save the jobs in XML file
897 Launcher::XML_Persistence::saveJobs(jobs_file, jobs_list);
901 Launcher_cpp::findJob(int job_id)
903 std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
904 if (it_job == _launcher_job_map.end())
906 LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
907 throw LauncherException("Cannot find the job, is it created ?");
909 Launcher::Job * job = it_job->second;