-// Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
+// Copyright (C) 2007-2012 CEA/DEN, EDF R&D, OPEN CASCADE
//
// Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
// CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
#include <signal.h>
#include <errno.h>
#include <string.h>
+
+#include "Batch_Constants.hxx"
#include "Batch_IOMutex.hxx"
#include "Batch_BatchManager_Local.hxx"
#include "Batch_RunTimeException.hxx"
// Constructeur
- BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
- CommunicationProtocolType protocolType)
- : BatchManager(parent, host), _connect(0),
- _protocol(CommunicationProtocol::getInstance(protocolType)),
+ BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
+ const char * username,
+ CommunicationProtocolType protocolType, const char * mpiImpl)
+ : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
_idCounter(0)
{
pthread_mutex_init(&_threads_mutex, NULL);
- pthread_cond_init(&_threadLaunchCondition, NULL);
+ pthread_cond_init(&_threadSyncCondition, NULL);
}
// Destructeur
BatchManager_Local::~BatchManager_Local()
{
+ for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
+ pthread_mutex_lock(&_threads_mutex);
+ string state = iter->second.param[STATE];
+ if (state != FINISHED && state != FAILED) {
+ UNDER_LOCK( cout << "Warning: Job " << iter->first <<
+ " is not finished, it will now be canceled." << endl );
+ pthread_cancel(iter->second.thread_id);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+ }
+ pthread_mutex_unlock(&_threads_mutex);
+ }
pthread_mutex_destroy(&_threads_mutex);
- pthread_cond_destroy(&_threadLaunchCondition);
- }
-
- const CommunicationProtocol & BatchManager_Local::getProtocol() const
- {
- return _protocol;
+ pthread_cond_destroy(&_threadSyncCondition);
}
// Methode pour le controle des jobs : soumet un job au gestionnaire
const JobId BatchManager_Local::submitJob(const Job & job)
{
+ // export input files in the working directory of the execution host
+ exportInputFiles(job);
+
Job_Local jobLocal = job;
Id id = _idCounter++;
ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
throw RunTimeException("Can't create new thread in BatchManager_Local");
}
- pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
pthread_mutex_unlock(&_threads_mutex);
ostringstream id_sst;
istringstream iss(jobid.getReference());
iss >> id;
- // On retrouve le thread_id du thread
- pthread_t thread_id;
-
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_threads_mutex);
- if (_threads.find(id) != _threads.end())
- thread_id = _threads[id].thread_id;
+ bool idFound = (_threads.find(id) != _threads.end());
+ if (idFound) {
+ string state = _threads[id].param[STATE];
+ if (state != FINISHED && state != FAILED) {
+ pthread_cancel(_threads[id].thread_id);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+ } else {
+ cout << "Cannot delete job " << jobid.getReference() <<
+ ". Job is already finished." << endl;
+ }
+ }
pthread_mutex_unlock(&_threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
- cancel(thread_id);
+ if (!idFound)
+ throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
}
// Methode pour le controle des jobs : suspend un job en file d'attente
}
+ // Ce manager ne peut pas reprendre un job
+ // On force donc l'état du job à erreur - pour cela on ne donne pas d'Id
+ // au JobId
+ const Batch::JobId
+ BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
+ {
+ return JobId(this, "undefined");
+ }
// Methode pour le controle des jobs : teste si un job est present en machine
bool BatchManager_Local::isRunning(const JobId & jobid)
istringstream iss(jobid.getReference());
iss >> id;
- Status status;
-
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_threads_mutex);
- status = _threads[id].status;
+ bool running = (_threads[id].param[STATE].str() == RUNNING);
pthread_mutex_unlock(&_threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
- return (status == RUNNING);
- }
-
- // Methode de destruction d'un job
- void BatchManager_Local::cancel(pthread_t thread_id)
- {
- pthread_cancel(thread_id);
+ return running;
}
if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
#endif
- exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
+ string fileToExecute = param[EXECUTABLE].str();
+ string::size_type p1 = fileToExecute.find_last_of("/");
+ string fileNameToExecute = fileToExecute.substr(p1+1);
+
+ exec_sub_cmd << "cd " << param[WORKDIR] << " && ./" << fileNameToExecute;
if (param.find(ARGUMENTS) != param.end()) {
Versatile V = param[ARGUMENTS];
user = string(it->second);
}
- return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
+ return _protocol.getExecCommandArgs(exec_sub_cmd.str(), _hostname, user);
}
// Methode d'execution du thread
void * BatchManager_Local::ThreadAdapter::run(void * arg)
{
+ ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+
#ifndef WIN32
// On bloque tous les signaux pour ce thread
sigset_t setmask;
// Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
// sera prise en compte par pthread_testcancel()
Process child;
- pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
-
- ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
-
-
-
-
- // Le code retour cumule (ORed) de tous les appels
- // Nul en cas de reussite de l'ensemble des operations
- int rc = 0;
-
- // Cette table contient la liste des fichiers a detruire a la fin du processus
- std::vector<string> files_to_delete;
-
-
-
- // On copie les fichiers d'entree pour le fils
- const Parametre param = p_ta->_job.getParametre();
- Parametre::const_iterator it;
-
- // On initialise la variable workdir a la valeur du Current Working Directory
- char * cwd =
-#ifdef WIN32
- _getcwd(NULL, 0);
-#else
- new char [PATH_MAX];
- getcwd(cwd, PATH_MAX);
-#endif
- string workdir = cwd;
- delete [] cwd;
-
- if ( (it = param.find(WORKDIR)) != param.end() ) {
- workdir = static_cast<string>( (*it).second );
- }
-
- string executionhost = string(param[EXECUTIONHOST]);
- string user;
- if ( (it = param.find(USER)) != param.end() ) {
- user = string(it->second);
- }
-
- if ( (it = param.find(INFILE)) != param.end() ) {
- Versatile V = (*it).second;
- Versatile::iterator Vit;
-
- for(Vit=V.begin(); Vit!=V.end(); Vit++) {
- CoupleType cpt = *static_cast< CoupleType * >(*Vit);
- Couple cp = cpt;
- string local = cp.getLocal();
- string remote = cp.getRemote();
-
- std::cerr << workdir << std::endl;
- std::cerr << remote << std::endl;
-
- int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
- workdir + "/" + remote,
- executionhost, user);
- if (status) {
- // Echec de la copie
- rc |= 1;
- } else {
- // On enregistre le fichier comme etant a detruire
- files_to_delete.push_back(workdir + "/" + remote);
- }
-
- }
- }
-
-
-
+ pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
+ pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
// On forke/exec un nouveau process pour pouvoir controler le fils
// (plus finement qu'avec un appel system)
}
#endif
+ pthread_mutex_lock(&p_ta->_bm._threads_mutex);
- // On copie les fichiers de sortie du fils
- if ( (it = param.find(OUTFILE)) != param.end() ) {
- Versatile V = (*it).second;
- Versatile::iterator Vit;
-
- for(Vit=V.begin(); Vit!=V.end(); Vit++) {
- CoupleType cpt = *static_cast< CoupleType * >(*Vit);
- Couple cp = cpt;
- string local = cp.getLocal();
- string remote = cp.getRemote();
-
- int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
- executionhost, user,
- local, "", "");
- if (status) {
- // Echec de la copie
- rc |= 1;
- } else {
- // On enregistre le fichier comme etant a detruire
- files_to_delete.push_back(workdir + "/" + remote);
- }
-
- }
- }
-
- // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
- // ou si la creation du fils n'a pu avoir lieu
- if ( (rc == 0) || (child < 0) ) {
- std::vector<string>::const_iterator it;
- for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
- p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
-/* string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
- UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
-#ifdef WIN32
- remove_cmd = string("\"") + remove_cmd + string("\"");
-#endif
- system(remove_cmd.c_str());*/
- }
- }
-
-
+ // Set the job state to FINISHED or FAILED
+ p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
// On retire la fonction de nettoyage de la memoire
pthread_cleanup_pop(0);
// On retire la fonction de suppression du fils
pthread_cleanup_pop(0);
+ // remove setFailedOnCancel function from cancel stack
+ pthread_cleanup_pop(0);
+ pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
// On invoque la fonction de nettoyage de la memoire
delete_on_exit(arg);
#ifndef WIN32
_bm._threads[_id].pid = child;
#endif
- _bm._threads[_id].status = RUNNING;
+ _bm._threads[_id].hasFailed = false;
_bm._threads[_id].param = param;
_bm._threads[_id].env = env;
_bm._threads[_id].command_queue.push(NOP);
// Unlock the master thread. From here, all shared variables must be protected
// from concurrent access
- pthread_cond_signal(&_bm._threadLaunchCondition);
+ pthread_cond_signal(&_bm._threadSyncCondition);
// on boucle en attendant que le fils ait termine
while (1) {
#ifdef WIN32
DWORD exitCode;
- BOOL res = GetExitCodeProcess(child, &exitCode);
+ GetExitCodeProcess(child, &exitCode);
if (exitCode != STILL_ACTIVE) {
- pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[_id].status = DONE;
- _bm._threads[_id].param[STATE] = Batch::FINISHED;
- pthread_mutex_unlock(&_bm._threads_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
break;
}
// Le fils est simplement stoppe
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[_id].status = STOPPED;
_bm._threads[_id].param[STATE] = Batch::PAUSED;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
}
else {
// Le fils est termine, on sort de la boucle et du if englobant
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[_id].status = DONE;
- _bm._threads[_id].param[STATE] = Batch::FINISHED;
- pthread_mutex_unlock(&_bm._threads_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
break;
}
// Le fils a disparu ...
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[_id].status = DEAD;
- _bm._threads[_id].param[STATE] = Batch::FAILED;
+ _bm._threads[_id].hasFailed = true;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
// On regarde si le fils n'a pas depasse son temps (wallclock time)
time_t child_currenttime = time(NULL);
- time_t child_elapsedtime = child_currenttime - child_starttime;
+ long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
if (param.find(MAXWALLTIME) != param.end()) {
- int maxwalltime = param[MAXWALLTIME];
+ long maxwalltime = param[MAXWALLTIME];
// cout << "child_starttime = " << child_starttime << endl
// << "child_currenttime = " << child_currenttime << endl
// << "child_elapsedtime = " << child_elapsedtime << endl
// << "maxwalltime = " << maxwalltime << endl
// << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
- if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
+ if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
// On introduit une commande dans la queue du thread
// @@@ --------> SECTION CRITIQUE <-------- @@@
// @@@ --------> SECTION CRITIQUE <-------- @@@
- } else if (child_elapsedtime > maxwalltime ) {
+ } else if (child_elapsedtime_minutes > maxwalltime ) {
UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
// On introduit une commande dans la queue du thread
// @@@ --------> SECTION CRITIQUE <-------- @@@
// On positionne les limites systeme imposees au fils
+ // This part is deactivated because those limits should be set on the job process, not on
+ // the ssh process. If it is done properly one day, beware of the types used (int is not enough)
+ /*
if (param.find(MAXCPUTIME) != param.end()) {
int maxcputime = param[MAXCPUTIME];
struct rlimit limit;
if (param.find(MAXRAMSIZE) != param.end()) {
int maxramsize = param[MAXRAMSIZE];
struct rlimit limit;
- limit.rlim_cur = maxramsize * 1024;
- limit.rlim_max = int(maxramsize * 1.1) * 1024;
+ limit.rlim_cur = maxramsize * 1024 * 1024;
+ limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
setrlimit(RLIMIT_AS, &limit);
}
+ */
//char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
// On execute la commande du fils
- int result = execve(argv[0], argv, envp);
+ execve(argv[0], argv, envp);
UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
// No need to deallocate since nothing happens after a successful exec
char * buffer = strdup(comstr.c_str());
// launch the new process
- BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
+ bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
if (buffer) free(buffer);
delete p_ta;
}
+ void BatchManager_Local::setFailedOnCancel(void * arg)
+ {
+ ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+ pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
+ p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
+ pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
+
+ // Unlock the master thread. From here, the batch manager instance should not be used.
+ pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
+ }
+
}