{
LAUNCHER_MESSAGE("Launcher_cpp constructor");
_job_cpt = 0;
- _job_cpt_mutex = new pthread_mutex_t();
- pthread_mutex_init(_job_cpt_mutex, NULL);
}
//=============================================================================
for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
delete it1->second;
#endif
-
- pthread_mutex_destroy(_job_cpt_mutex);
- delete _job_cpt_mutex;
}
#ifdef WITH_LIBBATCH
{
LAUNCHER_MESSAGE("Creating a new job");
// Add job to the jobs map
- pthread_mutex_lock(_job_cpt_mutex);
new_job->setNumber(_job_cpt);
_job_cpt++;
- pthread_mutex_unlock(_job_cpt_mutex);
std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
if (it_job == _launcher_job_map.end())
+ {
_launcher_job_map[new_job->getNumber()] = new_job;
+ }
else
{
- LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
+ LAUNCHER_INFOS("A job has already the same id: " << new_job->getNumber());
delete new_job;
- throw LauncherException("A job as already the same id - job is not created !");
+ throw LauncherException("A job has already the same id - job is not created !");
}
LAUNCHER_MESSAGE("New Job created");
}
{
LAUNCHER_MESSAGE("Launch a job");
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ // Check if job exists
+ Launcher::Job * job = findJob(job_id);
// Check job state (cannot launch a job already launched...)
if (job->getState() != "CREATED")
throw LauncherException("Bad state of the job: " + job->getState());
}
- // Third step search batch manager for the job into the map -> instantiate one if does not exist
-#ifdef WITH_LIBBATCH
- std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
- if(it == _batchmap.end())
- {
- createBatchManagerForJob(job);
- }
-#endif
+ Batch::BatchManager * bm = getBatchManager(job);
try {
- Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob()));
+ Batch::JobId batch_manager_job_id = bm->submitJob(*(job->getBatchJob()));
job->setBatchManagerJobId(batch_manager_job_id);
job->setState("QUEUED");
job->setReference(batch_manager_job_id.getReference());
LAUNCHER_MESSAGE("Get job state");
// Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string state;
try
LAUNCHER_MESSAGE("Get job assigned hostnames");
// Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string assigned_hostnames = job->getAssignedHostnames();
return assigned_hostnames.c_str();
{
LAUNCHER_MESSAGE("Get Job results");
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string resource_name = job->getResourceDefinition().Name;
try
{
{
LAUNCHER_MESSAGE("Clear the remote working directory");
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
try
{
_batchmap[job_id]->clearWorkingDir(*(job->getBatchJob()));
bool rtn;
LAUNCHER_MESSAGE("Get Job dump state");
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string resource_name = job->getResourceDefinition().Name;
try
{
bool rtn;
LAUNCHER_MESSAGE("Get working file " << work_file);
- // Check if job exist
- std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
- {
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
- }
-
- Launcher::Job * job = it_job->second;
+ Launcher::Job * job = findJob(job_id);
std::string resource_name = job->getResourceDefinition().Name;
try
{
{
LAUNCHER_MESSAGE("Stop Job");
- // Check if job exist
- std::map<int, Launcher::Job *>::iterator it_job = _launcher_job_map.find(job_id);
- if (it_job == _launcher_job_map.end())
+ Launcher::Job * job = findJob(job_id);
+ job->stopJob();
+}
+
+std::string
+Launcher_cpp::dumpJob(int job_id)
+{
+ LAUNCHER_MESSAGE("dump Job");
+
+ Launcher::Job * job = findJob(job_id);
+ return Launcher::XML_Persistence::dumpJob(*job);
+}
+
+int
+Launcher_cpp::restoreJob(const std::string& dumpedJob)
+{
+ LAUNCHER_MESSAGE("restore Job");
+ Launcher::Job * new_job=NULL;
+ int jobId = -1;
+ try
{
- LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
- throw LauncherException("Cannot find the job, is it created ?");
+ {
+ new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob);
+ }
+ if(new_job)
+ {
+ jobId = addJob(new_job);
+ if(jobId < 0)
+ delete new_job;
+ }
}
-
- it_job->second->stopJob();
+ catch(const LauncherException &ex)
+ {
+ LAUNCHER_INFOS("Cannot load the job. Exception: " << ex.msg.c_str());
+ if(new_job)
+ delete new_job;
+ }
+ return jobId;
}
//=============================================================================
"(libBatch was not present at compilation time)");
}
+std::string
+Launcher_cpp::dumpJob(int job_id)
+{
+ LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot dump job!!!");
+ throw LauncherException("Method Launcher_cpp::dumpJob is not available "
+ "(libBatch was not present at compilation time)");
+ return "";
+}
+
+int
+Launcher_cpp::restoreJob(const std::string& dumpedJob)
+{
+ LAUNCHER_INFOS("Launcher compiled without LIBBATCH - cannot restore job!!!");
+ throw LauncherException("Method Launcher_cpp::restoreJob is not available "
+ "(libBatch was not present at compilation time)");
+ return 0;
+}
+
long
Launcher_cpp::createJobWithFile( const std::string xmlExecuteFile, std::string clusterName)
{
return _launcher_job_map;
}
-void
-Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
+#ifdef WITH_LIBBATCH
+Batch::BatchManager*
+Launcher_cpp::getBatchManager(Launcher::Job * job)
{
+ Batch::BatchManager* result = nullptr;
int job_id = job->getNumber();
// Select a resource for the job
}
// Step 2: We can now add a Factory if the resource is correctly define
-#ifdef WITH_LIBBATCH
std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
if(it == _batchmap.end())
{
// Warning cannot write on one line like this, because map object is constructed before
// the method is called...
//_batchmap[job_id] = FactoryBatchManager(resource_definition);
- Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition);
- _batchmap[job_id] = batch_client;
+ result = FactoryBatchManager(resource_definition);
+ _batchmap[job_id] = result;
}
catch(const LauncherException &ex)
{
throw LauncherException(ex.message);
}
}
-#endif
+ else
+ result = it->second;
+ return result;
}
+#endif
void
Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job)
{
// Step 0: Calculated job_id
- pthread_mutex_lock(_job_cpt_mutex);
- int job_id = _job_cpt;
+ new_job->setNumber(_job_cpt);
_job_cpt++;
- new_job->setNumber(job_id);
- pthread_mutex_unlock(_job_cpt_mutex);
+#ifdef WITH_LIBBATCH
// Step 1: check if resource is already in the map
- createBatchManagerForJob(new_job);
+ Batch::BatchManager * bm = getBatchManager(new_job);
// Step 2: add the job to the batch manager
-#ifdef WITH_LIBBATCH
try
{
- Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()),
- new_job->getReference());
+ Batch::JobId batch_manager_job_id = bm->addJob(*(new_job->getBatchJob()),
+ new_job->getReference());
new_job->setBatchManagerJobId(batch_manager_job_id);
}
catch(const Batch::GenericException &ex)
// Step 3: add job to launcher map
std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(new_job->getNumber());
if (it_job == _launcher_job_map.end())
+ {
_launcher_job_map[new_job->getNumber()] = new_job;
+ }
else
{
LAUNCHER_INFOS("A job as already the same id: " << new_job->getNumber());
#endif
}
+int
+Launcher_cpp::addJob(Launcher::Job * new_job)
+{
+ string job_state = new_job->getState();
+ int jobId = -1;
+ if (job_state == "CREATED")
+ {
+ // In this case, we ignore run_part information
+ createJob(new_job);
+ jobId = new_job->getNumber();
+ }
+ else if (job_state == "QUEUED" ||
+ job_state == "RUNNING" ||
+ job_state == "IN_PROCESS" ||
+ job_state == "PAUSED")
+ {
+ addJobDirectlyToMap(new_job);
+ jobId = new_job->getNumber();
+
+ // We check that the BatchManager could resume the job
+#ifdef WITH_LIBBATCH
+ if (new_job->getBatchManagerJobId().getReference() != new_job->getReference())
+ {
+ LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR");
+ new_job->setState("ERROR");
+ }
+#endif
+ }
+ else if (job_state == "FINISHED" ||
+ job_state == "FAILED" ||
+ job_state == "ERROR")
+ {
+ // We add run_part information
+ addJobDirectlyToMap(new_job);
+ jobId = new_job->getNumber();
+ }
+ else
+ {
+ LAUNCHER_INFOS("A bad job is found, state unknown " << job_state);
+ jobId = -1;
+ }
+ return jobId;
+}
+
list<int>
Launcher_cpp::loadJobs(const char* jobs_file)
{
for (it_job = jobs_list.begin(); it_job != jobs_list.end(); it_job++)
{
Launcher::Job * new_job = *it_job;
- string job_state = new_job->getState();
-
+ int jobId = -1;
try
{
- if (job_state == "CREATED")
- {
- // In this case, we ignore run_part information
- createJob(new_job);
- new_jobs_id_list.push_back(new_job->getNumber());
- }
- else if (job_state == "QUEUED" ||
- job_state == "RUNNING" ||
- job_state == "IN_PROCESS" ||
- job_state == "PAUSED")
- {
- addJobDirectlyToMap(new_job);
- new_jobs_id_list.push_back(new_job->getNumber());
-
- // Step 4: We check that the BatchManager could resume
- // the job
-#ifdef WITH_LIBBATCH
- if (new_job->getBatchManagerJobId().getReference() != new_job->getReference())
- {
- LAUNCHER_INFOS("BatchManager type cannot resume a job - job state is set to ERROR");
- new_job->setState("ERROR");
- }
-#endif
- }
- else if (job_state == "FINISHED" ||
- job_state == "FAILED" ||
- job_state == "ERROR")
- {
- // Step 2: We add run_part information
- addJobDirectlyToMap(new_job);
- new_jobs_id_list.push_back(new_job->getNumber());
- }
+ jobId = addJob(new_job);
+ if(jobId >= 0)
+ new_jobs_id_list.push_back(jobId);
else
- {
- LAUNCHER_INFOS("A bad job is found, state unknown " << job_state);
delete new_job;
- }
}
catch(const LauncherException &ex)
{
{
// Create a sorted list from the internal job map
list<const Launcher::Job *> jobs_list;
- for (int i=0; i<_job_cpt; i++)
{
- map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
- if (it_job != _launcher_job_map.end())
- jobs_list.push_back(it_job->second);
+ for (int i=0; i<_job_cpt; i++)
+ {
+ map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
+ if (it_job != _launcher_job_map.end())
+ jobs_list.push_back(it_job->second);
+ }
}
-
// Save the jobs in XML file
Launcher::XML_Persistence::saveJobs(jobs_file, jobs_list);
}
+
+Launcher::Job *
+Launcher_cpp::findJob(int job_id)
+{
+ std::map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(job_id);
+ if (it_job == _launcher_job_map.end())
+ {
+ LAUNCHER_INFOS("Cannot find the job, is it created ? job number: " << job_id);
+ throw LauncherException("Cannot find the job, is it created ?");
+ }
+ Launcher::Job * job = it_job->second;
+ return job;
+}
\ No newline at end of file