-// Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
+// Copyright (C) 2007-2011 CEA/DEN, EDF R&D, OPEN CASCADE
//
// Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
// CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
* Date : Thu Nov 6 10:17:22 2003
* Projet : Salome 2
*
+* Refactored by Renaud Barate (EDF R&D) in September 2009 to use
+* CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
+* managers.
*/
#include <iostream>
#include <sys/types.h>
#ifdef WIN32
-# include <direct.h>
-#include "Batch_RunTimeException.hxx"
+#include <direct.h>
#else
-# include <sys/wait.h>
-# include <unistd.h>
+#include <sys/wait.h>
+#include <unistd.h>
#endif
#include <ctime>
#include <pthread.h>
#include <signal.h>
#include <errno.h>
#include <string.h>
+
+#include "Batch_Constants.hxx"
#include "Batch_IOMutex.hxx"
#include "Batch_BatchManager_Local.hxx"
+#include "Batch_RunTimeException.hxx"
using namespace std;
// Constructeur
- BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host)
- throw(InvalidArgumentException,ConnexionFailureException)
- : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), _thread_id_id_association_mutex(),
- _thread_id_id_association_cond(), _thread_id_id_association()
+ BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
+ CommunicationProtocolType protocolType)
+ : BatchManager(parent, host), _connect(0),
+ _protocol(CommunicationProtocol::getInstance(protocolType)),
+ _idCounter(0)
{
pthread_mutex_init(&_threads_mutex, NULL);
- pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
- pthread_cond_init(&_thread_id_id_association_cond, NULL);
+ pthread_cond_init(&_threadSyncCondition, NULL);
}
// Destructeur
BatchManager_Local::~BatchManager_Local()
{
+ for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
+ pthread_mutex_lock(&_threads_mutex);
+ string state = iter->second.param[STATE];
+ if (state != FINISHED && state != FAILED) {
+ UNDER_LOCK( cout << "Warning: Job " << iter->first <<
+ " is not finished, it will now be canceled." << endl );
+ pthread_cancel(iter->second.thread_id);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+ }
+ pthread_mutex_unlock(&_threads_mutex);
+ }
pthread_mutex_destroy(&_threads_mutex);
- pthread_mutex_destroy(&_thread_id_id_association_mutex);
- pthread_cond_destroy(&_thread_id_id_association_cond);
+ pthread_cond_destroy(&_threadSyncCondition);
+ }
+
+ const CommunicationProtocol & BatchManager_Local::getProtocol() const
+ {
+ return _protocol;
}
// Methode pour le controle des jobs : soumet un job au gestionnaire
const JobId BatchManager_Local::submitJob(const Job & job)
{
Job_Local jobLocal = job;
+ Id id = _idCounter++;
+ ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
+
+ // Les attributs du thread a sa creation
+ pthread_attr_t thread_attr;
+ pthread_attr_init(&thread_attr);
+ pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
+
+ // Creation du thread qui va executer la commande systeme qu'on lui passe
+ pthread_t thread_id;
+ pthread_mutex_lock(&_threads_mutex);
+ int rc = pthread_create(&thread_id,
+ &thread_attr,
+ &ThreadAdapter::run,
+ static_cast<void *>(p_ta));
- pthread_t thread_id = submit(jobLocal);
+ // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
+ pthread_attr_destroy(&thread_attr);
- ostringstream oss;
- oss << getIdByThread_id(thread_id);
+ if (rc != 0) {
+ pthread_mutex_unlock(&_threads_mutex);
+ throw RunTimeException("Can't create new thread in BatchManager_Local");
+ }
- JobId id(this, oss.str());
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+ pthread_mutex_unlock(&_threads_mutex);
- return id;
+ ostringstream id_sst;
+ id_sst << id;
+ return JobId(this, id_sst.str());
}
// Methode pour le controle des jobs : retire un job du gestionnaire
istringstream iss(jobid.getReference());
iss >> id;
- // On retrouve le thread_id du thread
- pthread_t thread_id;
-
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_threads_mutex);
- if (_threads.find(id) != _threads.end())
- thread_id = _threads[id].thread_id;
+ bool idFound = (_threads.find(id) != _threads.end());
+ if (idFound) {
+ string state = _threads[id].param[STATE];
+ if (state != FINISHED && state != FAILED) {
+ pthread_cancel(_threads[id].thread_id);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+ } else {
+ cout << "Cannot delete job " << jobid.getReference() <<
+ ". Job is already finished." << endl;
+ }
+ }
pthread_mutex_unlock(&_threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
- cancel(thread_id);
+ if (!idFound)
+ throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
}
// Methode pour le controle des jobs : suspend un job en file d'attente
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_threads_mutex);
- param = _threads[id].param;
- env = _threads[id].env;
+ std::map<Id, Child >::iterator pos = _threads.find(id);
+ bool found = (pos != _threads.end());
+ if (found) {
+ param = pos->second.param;
+ env = pos->second.env;
+ }
pthread_mutex_unlock(&_threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
+ if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
+
JobInfo_Local ji(param, env);
return ji;
}
+ // Ce manager ne peut pas reprendre un job
+ // On force donc l'état du job à erreur - pour cela on ne donne pas d'Id
+ // au JobId
+ const Batch::JobId
+ BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
+ {
+ return JobId(this, "undefined");
+ }
// Methode pour le controle des jobs : teste si un job est present en machine
bool BatchManager_Local::isRunning(const JobId & jobid)
istringstream iss(jobid.getReference());
iss >> id;
- Status status;
-
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_threads_mutex);
- status = _threads[id].status;
+ bool running = (_threads[id].param[STATE].str() == RUNNING);
pthread_mutex_unlock(&_threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
- return (status == RUNNING);
+ return running;
}
- // Methode d'execution d'un job
- pthread_t BatchManager_Local::submit(const Job_Local & job)
+ vector<string> BatchManager_Local::exec_command(const Parametre & param) const
{
- // L'id du thread a creer
- pthread_t thread_id =
+ ostringstream exec_sub_cmd;
+
#ifdef WIN32
- {0,0};
-#else
- 0;
+ char drive[_MAX_DRIVE];
+ _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
+ if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
#endif
- // Les attributs du thread a sa creation
- pthread_attr_t thread_attr;
- pthread_attr_init(&thread_attr);
- pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
-
- ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
+ exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
- // Creation du thread qui va executer la commande systeme qu'on lui passe
- int rc = pthread_create(&thread_id,
- &thread_attr,
- &ThreadAdapter::run,
- static_cast<void *>(p_ta));
- if (rc) {
+ if (param.find(ARGUMENTS) != param.end()) {
+ Versatile V = param[ARGUMENTS];
+ for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
+ StringType argt = * static_cast<StringType *>(*it);
+ string arg = argt;
+ exec_sub_cmd << " " << arg;
+ }
}
- // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
- pthread_attr_destroy(&thread_attr);
-
- return thread_id;
- }
-
-
- // Methode de destruction d'un job
- void BatchManager_Local::cancel(pthread_t thread_id)
- {
- pthread_cancel(thread_id);
- }
-
-
- // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
- // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
- // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
- // Thread_id / Id (_thread_id_id_association_mutex)
- BatchManager_Local::Id BatchManager_Local::nextId()
- {
- static Id id = 0;
- Id nextId = id++;
- //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
- return nextId;
- }
-
-
- // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
- BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
- {
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_thread_id_id_association_mutex);
- bool threadIdFound = false;
- std::list<struct ThreadIdIdAssociation>::iterator it;
- while (!threadIdFound) {
- for (it = _thread_id_id_association.begin() ;
- it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
- it++);
- if (it == _thread_id_id_association.end())
- pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
- else
- threadIdFound = true;
+ if (param.find(INFILE) != param.end()) {
+ Versatile V = param[INFILE];
+ for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
+ Couple cpl = * static_cast<CoupleType*>(*it);
+ string remote = cpl.getRemote();
+ if (remote == "stdin")
+ exec_sub_cmd << " <stdin";
+ }
}
- Id id = it->id;
- _thread_id_id_association.erase(it);
-
- pthread_mutex_unlock(&_thread_id_id_association_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
-
- //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
- return id;
- }
-
-
- // Associe un Thread_id a un Id nouvellement cree
- BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
- {
- Id id = -1;
+ if (param.find(OUTFILE) != param.end()) {
+ Versatile V = param[OUTFILE];
+ for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
+ Couple cpl = * static_cast<CoupleType*>(*it);
+ string remote = cpl.getRemote();
+ if (remote == "stdout") exec_sub_cmd << " 1>stdout";
+ if (remote == "stderr") exec_sub_cmd << " 2>stderr";
+ }
+ }
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_thread_id_id_association_mutex);
- std::list<struct ThreadIdIdAssociation>::iterator it;
- for (it = _thread_id_id_association.begin() ;
- it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
- it++);
- if (it == _thread_id_id_association.end()) {
- struct ThreadIdIdAssociation newAssociation;
- id = newAssociation.id = nextId();
- newAssociation.threadId = thread_id;
- _thread_id_id_association.push_back(newAssociation);
- pthread_cond_signal(&_thread_id_id_association_cond);
-
- } else {
- UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
+ string user;
+ Parametre::const_iterator it = param.find(USER);
+ if (it != param.end()) {
+ user = string(it->second);
}
- pthread_mutex_unlock(&_thread_id_id_association_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
- return id;
+ return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
}
+
// Constructeur de la classe ThreadAdapter
- BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
- _bm(bm), _job(job)
+ BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
+ _bm(bm), _job(job), _id(id)
{
// Nothing to do
}
// Methode d'execution du thread
void * BatchManager_Local::ThreadAdapter::run(void * arg)
{
+ ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+
#ifndef WIN32
// On bloque tous les signaux pour ce thread
sigset_t setmask;
// Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
// sera prise en compte par pthread_testcancel()
Process child;
- pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
-
- ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
-
-
+ pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
+ pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
// Le code retour cumule (ORed) de tous les appels
string local = cp.getLocal();
string remote = cp.getRemote();
- string copy_cmd = p_ta->getBatchManager().copy_command("", "", local, user,
- executionhost, workdir + "/" + remote);
- UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
-#ifdef WIN32
- copy_cmd = string("\"") + copy_cmd + string("\"");
-#endif
+ std::cerr << workdir << std::endl;
+ std::cerr << remote << std::endl;
- if (system(copy_cmd.c_str()) ) {
+ int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
+ workdir + "/" + remote,
+ executionhost, user);
+ if (status) {
// Echec de la copie
rc |= 1;
} else {
// On forke/exec un nouveau process pour pouvoir controler le fils
// (plus finement qu'avec un appel system)
// int rc = system(commande.c_str());
+ //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
+ //execv("/usr/bin/ssh", parmList);
#ifdef WIN32
child = p_ta->launchWin32ChildProcess();
p_ta->pere(child);
string local = cp.getLocal();
string remote = cp.getRemote();
- string copy_cmd = p_ta->getBatchManager().copy_command(user, executionhost, workdir + "/" + remote,
- "", "", local);
- UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
-#ifdef WIN32
- copy_cmd = string("\"") + copy_cmd + string("\"");
-#endif
-
- if (system(copy_cmd.c_str()) ) {
+ int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
+ executionhost, user,
+ local, "", "");
+ if (status) {
// Echec de la copie
rc |= 1;
} else {
if ( (rc == 0) || (child < 0) ) {
std::vector<string>::const_iterator it;
for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
- string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
+ p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
+/* string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
#ifdef WIN32
remove_cmd = string("\"") + remove_cmd + string("\"");
#endif
- system(remove_cmd.c_str());
+ system(remove_cmd.c_str());*/
}
}
+ pthread_mutex_lock(&p_ta->_bm._threads_mutex);
+ // Set the job state to FINISHED or FAILED
+ p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
// On retire la fonction de nettoyage de la memoire
pthread_cleanup_pop(0);
// On retire la fonction de suppression du fils
pthread_cleanup_pop(0);
+ // remove setFailedOnCancel function from cancel stack
+ pthread_cleanup_pop(0);
+ pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
// On invoque la fonction de nettoyage de la memoire
delete_on_exit(arg);
// On enregistre le fils dans la table des threads
pthread_t thread_id = pthread_self();
- Id id = _bm.registerThread_id(thread_id);
Parametre param = _job.getParametre();
Environnement env = _job.getEnvironnement();
- ostringstream thread_id_sst;
- thread_id_sst << id;
- param[ID] = thread_id_sst.str();
- param[STATE] = "Running";
+ ostringstream id_sst;
+ id_sst << _id;
+ param[ID] = id_sst.str();
+ param[STATE] = Batch::RUNNING;
#ifndef WIN32
param[PID] = child;
#endif
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].thread_id = thread_id;
+ _bm._threads[_id].thread_id = thread_id;
#ifndef WIN32
- _bm._threads[id].pid = child;
+ _bm._threads[_id].pid = child;
#endif
- _bm._threads[id].status = RUNNING;
- _bm._threads[id].param = param;
- _bm._threads[id].env = env;
- _bm._threads[id].command_queue.push(NOP);
- pthread_mutex_unlock(&_bm._threads_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
-
+ _bm._threads[_id].hasFailed = false;
+ _bm._threads[_id].param = param;
+ _bm._threads[_id].env = env;
+ _bm._threads[_id].command_queue.push(NOP);
+ // Unlock the master thread. From here, all shared variables must be protected
+ // from concurrent access
+ pthread_cond_signal(&_bm._threadSyncCondition);
// on boucle en attendant que le fils ait termine
DWORD exitCode;
BOOL res = GetExitCodeProcess(child, &exitCode);
if (exitCode != STILL_ACTIVE) {
- pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].status = DONE;
- _bm._threads[id].param[STATE] = "Done";
- pthread_mutex_unlock(&_bm._threads_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
break;
}
int child_rc = 0;
pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
if (child_wait_rc > 0) {
+ UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
+#ifdef WIFCONTINUED
+ UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
+#endif
if (WIFSTOPPED(child_rc)) {
// NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
// soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
// Le fils est simplement stoppe
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].status = STOPPED;
- _bm._threads[id].param[STATE] = "Stopped";
+ _bm._threads[_id].param[STATE] = Batch::PAUSED;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
}
else {
// Le fils est termine, on sort de la boucle et du if englobant
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].status = DONE;
- _bm._threads[id].param[STATE] = "Done";
- pthread_mutex_unlock(&_bm._threads_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
break;
}
// Le fils a disparu ...
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].status = DEAD;
- _bm._threads[id].param[STATE] = "Dead";
+ _bm._threads[_id].hasFailed = true;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
// On regarde si le fils n'a pas depasse son temps (wallclock time)
time_t child_currenttime = time(NULL);
- time_t child_elapsedtime = child_currenttime - child_starttime;
+ long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
if (param.find(MAXWALLTIME) != param.end()) {
- int maxwalltime = param[MAXWALLTIME];
+ long maxwalltime = param[MAXWALLTIME];
// cout << "child_starttime = " << child_starttime << endl
// << "child_currenttime = " << child_currenttime << endl
// << "child_elapsedtime = " << child_elapsedtime << endl
// << "maxwalltime = " << maxwalltime << endl
// << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
- if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
- UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
+ if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
+ UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
// On introduit une commande dans la queue du thread
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- if (_bm._threads.find(id) != _bm._threads.end())
- _bm._threads[id].command_queue.push(KILL);
+ if (_bm._threads.find(_id) != _bm._threads.end())
+ _bm._threads[_id].command_queue.push(KILL);
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
- } else if (child_elapsedtime > maxwalltime ) {
- UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
+ } else if (child_elapsedtime_minutes > maxwalltime ) {
+ UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
// On introduit une commande dans la queue du thread
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- if (_bm._threads.find(id) != _bm._threads.end())
- _bm._threads[id].command_queue.push(TERM);
+ if (_bm._threads.find(_id) != _bm._threads.end())
+ _bm._threads[_id].command_queue.push(TERM);
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
}
// On regarde s'il y a quelque chose a faire dans la queue de commande
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- if (_bm._threads.find(id) != _bm._threads.end()) {
- while (_bm._threads[id].command_queue.size() > 0) {
- Commande cmd = _bm._threads[id].command_queue.front();
- _bm._threads[id].command_queue.pop();
+ if (_bm._threads.find(_id) != _bm._threads.end()) {
+ while (_bm._threads[_id].command_queue.size() > 0) {
+ Commande cmd = _bm._threads[_id].command_queue.front();
+ _bm._threads[_id].command_queue.pop();
switch (cmd) {
case NOP:
Parametre param = _job.getParametre();
Parametre::iterator it;
+ //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+ //int result = execv("/usr/bin/ssh", parmList);
+ //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+ //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
try {
// EXECUTABLE is MANDATORY, if missing, we exit with failure notification
- char * execpath = NULL;
+ vector<string> command;
if (param.find(EXECUTABLE) != param.end()) {
- string executable = _bm.exec_command(param);
- execpath = new char [executable.size() + 1];
- strncpy(execpath, executable.c_str(), executable.size() + 1);
+ command = _bm.exec_command(param);
} else exit(1);
- string debug_command = execpath;
-
- string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
-
- char ** argv = NULL;
- if (param.find(ARGUMENTS) != param.end()) {
- Versatile V = param[ARGUMENTS];
-
- argv = new char * [V.size() + 2]; // 1 pour name et 1 pour le NULL terminal
-
- argv[0] = new char [name.size() + 1];
- strncpy(argv[0], name.c_str(), name.size() + 1);
-
- debug_command += string(" # ") + argv[0];
-
- int i = 1;
- for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++, i++) {
- StringType argt = * static_cast<StringType *>(*it);
- string arg = argt;
- argv[i] = new char [arg.size() + 1];
- strncpy(argv[i], arg.c_str(), arg.size() + 1);
- debug_command += string(" # ") + argv[i];
- }
-
- // assert (i == V.size() + 1)
- argv[i] = NULL;
+ // Build the argument array argv from the command
+ char ** argv = new char * [command.size() + 1];
+ string comstr;
+ for (string::size_type i=0 ; i<command.size() ; i++) {
+ argv[i] = new char[command[i].size() + 1];
+ strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
+ if (i>0) comstr += " # ";
+ comstr += command[i];
}
+ argv[command.size()] = NULL;
- UNDER_LOCK( cout << "*** debug_command = " << debug_command << endl );
-
-
+ UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
+ UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
// Create the environment for the new process. Note (RB): Here we change the environment for
// the process launched in local. It would seem more logical to set the environment for the
envp[i] = NULL;
}
+ //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+ //int result = execv("/usr/bin/ssh", parmList);
+ //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+ //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
if (param.find(MAXRAMSIZE) != param.end()) {
int maxramsize = param[MAXRAMSIZE];
struct rlimit limit;
- limit.rlim_cur = maxramsize * 1024;
- limit.rlim_max = int(maxramsize * 1.1) * 1024;
+ limit.rlim_cur = maxramsize * 1024 * 1024;
+ limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
setrlimit(RLIMIT_AS, &limit);
}
+ //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+ //int result = execv("/usr/bin/ssh", parmList);
+ //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+ //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
// On cree une session pour le fils de facon a ce qu'il ne soit pas
// detruit lorsque le shell se termine (le shell ouvre une session et
// On execute la commande du fils
- execve(execpath, argv, envp);
-
+ execve(argv[0], argv, envp);
+ UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
// No need to deallocate since nothing happens after a successful exec
// Normalement on ne devrait jamais arriver ici
try {
// EXECUTABLE is MANDATORY, if missing, we throw an exception
- string exec_command;
+ vector<string> exec_command;
if (param.find(EXECUTABLE) != param.end()) {
exec_command = _bm.exec_command(param);
} else {
throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
}
- string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
-
- if (param.find(ARGUMENTS) != param.end()) {
- Versatile V = param[ARGUMENTS];
-
- for(Versatile::const_iterator it=V.begin() ; it!=V.end() ; it++) {
- StringType argt = * static_cast<StringType *>(*it);
- exec_command += string(" ") + string(argt);
- }
+ // Build the command string from the command argument vector
+ string comstr;
+ for (unsigned int i=0 ; i<exec_command.size() ; i++) {
+ if (i>0) comstr += " ";
+ comstr += exec_command[i];
}
-
- UNDER_LOCK( cout << "*** exec_command = " << exec_command << endl );
-
+ UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
// Create the environment for the new process. Note (RB): Here we change the environment for
// the process launched in local. It would seem more logical to set the environment for the
ZeroMemory( &pi, sizeof(pi) );
// Copy the command to a non-const buffer
- size_t str_size = exec_command.size();
- char buffer[str_size+1];
- exec_command.copy(buffer,str_size);
- buffer[str_size]='\0';
+ char * buffer = strdup(comstr.c_str());
// launch the new process
BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
- 0, chNewEnv, NULL, &si, &pi);
+ CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
+ if (buffer) free(buffer);
if (!res) throw RunTimeException("Error while creating new process");
CloseHandle(pi.hThread);
delete p_ta;
}
+ void BatchManager_Local::setFailedOnCancel(void * arg)
+ {
+ ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+ pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
+ p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
+ pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
+
+ // Unlock the master thread. From here, the batch manager instance should not be used.
+ pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
+ }
+
}