using namespace std;
-class ReadLock
-{
-public:
- ReadLock(pthread_rwlock_t * lock)
- : _lock(lock)
- {
-// pthread_rwlock_rdlock(_lock);
- pthread_rwlock_wrlock(_lock);
- }
- ~ReadLock()
- {
- pthread_rwlock_unlock(_lock);
- }
-private:
- pthread_rwlock_t * _lock;
-};
-
-class WriteLock
-{
-public:
- WriteLock(pthread_rwlock_t * lock)
- : _lock(lock)
- {
- pthread_rwlock_wrlock(_lock);
- }
- ~WriteLock()
- {
- pthread_rwlock_unlock(_lock);
- }
-private:
- pthread_rwlock_t * _lock;
-};
-
//=============================================================================
/*!
* Constructor
{
LAUNCHER_MESSAGE("Launcher_cpp constructor");
_job_cpt = 0;
- _lock = new pthread_rwlock_t();
- pthread_rwlock_init(_lock, NULL);
}
//=============================================================================
for(it1=_batchmap.begin();it1!=_batchmap.end();it1++)
delete it1->second;
#endif
-
- pthread_rwlock_destroy(_lock);
- delete _lock;
}
#ifdef WITH_LIBBATCH
void
Launcher_cpp::createJob(Launcher::Job * new_job)
{
- WriteLock mutex(_lock);
LAUNCHER_MESSAGE("Creating a new job");
// Add job to the jobs map
new_job->setNumber(_job_cpt);
void
Launcher_cpp::launchJob(int job_id)
{
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("Launch a job");
- // Check if job exist
+ // Check if job exists
Launcher::Job * job = findJob(job_id);
// Check job state (cannot launch a job already launched...)
throw LauncherException("Bad state of the job: " + job->getState());
}
- // Third step search batch manager for the job into the map -> instantiate one if does not exist
- std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
- if(it == _batchmap.end())
- {
- createBatchManagerForJob(job);
- }
+ Batch::BatchManager * bm = getBatchManager(job);
try {
- Batch::JobId batch_manager_job_id = _batchmap[job_id]->submitJob(*(job->getBatchJob()));
+ Batch::JobId batch_manager_job_id = bm->submitJob(*(job->getBatchJob()));
job->setBatchManagerJobId(batch_manager_job_id);
job->setState("QUEUED");
job->setReference(batch_manager_job_id.getReference());
const char *
Launcher_cpp::getJobState(int job_id)
{
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("Get job state");
// Check if job exist
const char *
Launcher_cpp::getAssignedHostnames(int job_id)
{
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("Get job assigned hostnames");
// Check if job exist
void
Launcher_cpp::getJobResults(int job_id, std::string directory)
{
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("Get Job results");
Launcher::Job * job = findJob(job_id);
void
Launcher_cpp::clearJobWorkingDir(int job_id)
{
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("Clear the remote working directory");
Launcher::Job * job = findJob(job_id);
Launcher_cpp::getJobDumpState(int job_id, std::string directory)
{
bool rtn;
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("Get Job dump state");
Launcher::Job * job = findJob(job_id);
std::string directory)
{
bool rtn;
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("Get working file " << work_file);
Launcher::Job * job = findJob(job_id);
void
Launcher_cpp::removeJob(int job_id)
{
- WriteLock mutex(_lock);
LAUNCHER_MESSAGE("Remove Job");
// Check if job exist
void
Launcher_cpp::stopJob(int job_id)
{
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("Stop Job");
Launcher::Job * job = findJob(job_id);
std::string
Launcher_cpp::dumpJob(int job_id)
{
- ReadLock mutex(_lock);
LAUNCHER_MESSAGE("dump Job");
Launcher::Job * job = findJob(job_id);
int jobId = -1;
try
{
- new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob);
+ {
+ new_job = Launcher::XML_Persistence::createJobFromString(dumpedJob);
+ }
if(new_job)
{
jobId = addJob(new_job);
return _launcher_job_map;
}
-void
-Launcher_cpp::createBatchManagerForJob(Launcher::Job * job)
+#ifdef WITH_LIBBATCH
+Batch::BatchManager*
+Launcher_cpp::getBatchManager(Launcher::Job * job)
{
+ Batch::BatchManager* result = nullptr;
int job_id = job->getNumber();
// Select a resource for the job
}
// Step 2: We can now add a Factory if the resource is correctly define
-#ifdef WITH_LIBBATCH
std::map<int, Batch::BatchManager *>::const_iterator it = _batchmap.find(job_id);
if(it == _batchmap.end())
{
// Warning cannot write on one line like this, because map object is constructed before
// the method is called...
//_batchmap[job_id] = FactoryBatchManager(resource_definition);
- Batch::BatchManager * batch_client = FactoryBatchManager(resource_definition);
- _batchmap[job_id] = batch_client;
+ result = FactoryBatchManager(resource_definition);
+ _batchmap[job_id] = result;
}
catch(const LauncherException &ex)
{
throw LauncherException(ex.message);
}
}
-#endif
+ else
+ result = it->second;
+ return result;
}
+#endif
void
Launcher_cpp::addJobDirectlyToMap(Launcher::Job * new_job)
{
- WriteLock mutex(_lock);
// Step 0: Calculated job_id
- int job_id = _job_cpt;
+ new_job->setNumber(_job_cpt);
_job_cpt++;
- new_job->setNumber(job_id);
+#ifdef WITH_LIBBATCH
// Step 1: check if resource is already in the map
- createBatchManagerForJob(new_job);
+ Batch::BatchManager * bm = getBatchManager(new_job);
// Step 2: add the job to the batch manager
-#ifdef WITH_LIBBATCH
try
{
- Batch::JobId batch_manager_job_id = _batchmap[job_id]->addJob(*(new_job->getBatchJob()),
- new_job->getReference());
+ Batch::JobId batch_manager_job_id = bm->addJob(*(new_job->getBatchJob()),
+ new_job->getReference());
new_job->setBatchManagerJobId(batch_manager_job_id);
}
catch(const Batch::GenericException &ex)
// Create a sorted list from the internal job map
list<const Launcher::Job *> jobs_list;
{
- ReadLock mutex(_lock);
for (int i=0; i<_job_cpt; i++)
{
map<int, Launcher::Job *>::const_iterator it_job = _launcher_job_map.find(i);
--- /dev/null
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# classic use case of a job
+import os
+import salome
+import tempfile
+import time
+import sys
+
+if __name__ == '__main__':
+ salome.salome_init()
+ launcher = salome.naming_service.Resolve('/SalomeLauncher')
+ job_params = salome.JobParameters()
+ job_params.resource_required = salome.ResourceParameters()
+ job_params.resource_required.name = "localhost"
+ job_params.resource_required.nb_proc = 1 # slurm: --ntasks
+
+ job_params.job_type = "command"
+ #cwd = os.getcwd()
+ cwd = sys.argv[1]
+ case_dir = tempfile.mkdtemp(prefix="test")
+ job_params.local_directory = cwd
+ job_params.job_file = "command.sh"
+ job_params.work_directory = os.path.join(case_dir, "run")
+ job_params.result_directory = os.path.join(case_dir, "result")
+ job_params.out_files = ["result.txt"]
+ job_params.wckey="P11U5:CARBONES"
+ job_params.job_name = "MyJob"
+ job_id = launcher.createJob(job_params)
+ launcher.launchJob(job_id)
+ job_string = launcher.dumpJob(job_id)
+ jobState = launcher.getJobState(job_id)
+ while jobState != 'FINISHED' and jobState != 'FAILED':
+ time.sleep(1)
+ jobState = launcher.getJobState(job_id)
+ launcher.getJobResults(job_id, '')
+ launcher.getJobWorkFile(job_id, "result.txt", '')
+ launcher.clearJobWorkingDir(job_id)
+ launcher.removeJob(job_id)
+ new_id = launcher.restoreJob(job_string)
+ job_params_bis = launcher.getJobParameters(new_id)
+ jobState = launcher.getJobState(new_id)
+ launcher.removeJob(new_id)
+ launcher.saveJobs(os.path.join(case_dir,"savejobs.xml"))