#include <sys/types.h>
#ifdef WIN32
#include <direct.h>
-#include "Batch_RunTimeException.hxx"
#else
#include <sys/wait.h>
#include <unistd.h>
#include <string.h>
#include "Batch_IOMutex.hxx"
#include "Batch_BatchManager_Local.hxx"
+#include "Batch_RunTimeException.hxx"
using namespace std;
// Constructeur
BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
CommunicationProtocolType protocolType)
- : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(),
+ : BatchManager(parent, host), _connect(0),
_protocol(CommunicationProtocol::getInstance(protocolType)),
- _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
+ _idCounter(0)
{
pthread_mutex_init(&_threads_mutex, NULL);
- pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
- pthread_cond_init(&_thread_id_id_association_cond, NULL);
+ pthread_cond_init(&_threadLaunchCondition, NULL);
}
// Destructeur
BatchManager_Local::~BatchManager_Local()
{
pthread_mutex_destroy(&_threads_mutex);
- pthread_mutex_destroy(&_thread_id_id_association_mutex);
- pthread_cond_destroy(&_thread_id_id_association_cond);
+ pthread_cond_destroy(&_threadLaunchCondition);
}
const CommunicationProtocol & BatchManager_Local::getProtocol() const
const JobId BatchManager_Local::submitJob(const Job & job)
{
Job_Local jobLocal = job;
+ Id id = _idCounter++;
+ ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
+
+ // Les attributs du thread a sa creation
+ pthread_attr_t thread_attr;
+ pthread_attr_init(&thread_attr);
+ pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
+
+ // Creation du thread qui va executer la commande systeme qu'on lui passe
+ pthread_t thread_id;
+ pthread_mutex_lock(&_threads_mutex);
+ int rc = pthread_create(&thread_id,
+ &thread_attr,
+ &ThreadAdapter::run,
+ static_cast<void *>(p_ta));
- pthread_t thread_id = submit(jobLocal);
+ // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
+ pthread_attr_destroy(&thread_attr);
- ostringstream oss;
- oss << getIdByThread_id(thread_id);
+ if (rc != 0) {
+ pthread_mutex_unlock(&_threads_mutex);
+ throw RunTimeException("Can't create new thread in BatchManager_Local");
+ }
- JobId id(this, oss.str());
+ pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex);
+ pthread_mutex_unlock(&_threads_mutex);
- return id;
+ ostringstream id_sst;
+ id_sst << id;
+ return JobId(this, id_sst.str());
}
// Methode pour le controle des jobs : retire un job du gestionnaire
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_threads_mutex);
- param = _threads[id].param;
- env = _threads[id].env;
+ std::map<Id, Child >::iterator pos = _threads.find(id);
+ bool found = (pos != _threads.end());
+ if (found) {
+ param = pos->second.param;
+ env = pos->second.env;
+ }
pthread_mutex_unlock(&_threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
+ if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
+
JobInfo_Local ji(param, env);
return ji;
}
return (status == RUNNING);
}
-
- // Methode d'execution d'un job
- pthread_t BatchManager_Local::submit(const Job_Local & job)
- {
- // L'id du thread a creer
- pthread_t thread_id =
-#ifdef WIN32
- {0,0};
-#else
- 0;
-#endif
-
- // Les attributs du thread a sa creation
- pthread_attr_t thread_attr;
- pthread_attr_init(&thread_attr);
- pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
-
- ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
-
- // Creation du thread qui va executer la commande systeme qu'on lui passe
- int rc = pthread_create(&thread_id,
- &thread_attr,
- &ThreadAdapter::run,
- static_cast<void *>(p_ta));
- if (rc) {
- }
-
- // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
- pthread_attr_destroy(&thread_attr);
-
- return thread_id;
- }
-
-
// Methode de destruction d'un job
void BatchManager_Local::cancel(pthread_t thread_id)
{
}
- // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
- // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
- // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
- // Thread_id / Id (_thread_id_id_association_mutex)
- BatchManager_Local::Id BatchManager_Local::nextId()
- {
- static Id id = 0;
- Id nextId = id++;
- //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
- return nextId;
- }
-
-
- // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
- BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
- {
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_thread_id_id_association_mutex);
- bool threadIdFound = false;
- std::list<struct ThreadIdIdAssociation>::iterator it;
- while (!threadIdFound) {
- for (it = _thread_id_id_association.begin() ;
- it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
- it++);
- if (it == _thread_id_id_association.end())
- pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
- else
- threadIdFound = true;
- }
-
- Id id = it->id;
- _thread_id_id_association.erase(it);
-
- pthread_mutex_unlock(&_thread_id_id_association_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
-
- //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
- return id;
- }
-
-
- // Associe un Thread_id a un Id nouvellement cree
- BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
- {
- Id id = -1;
-
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_thread_id_id_association_mutex);
- std::list<struct ThreadIdIdAssociation>::iterator it;
- for (it = _thread_id_id_association.begin() ;
- it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
- it++);
- if (it == _thread_id_id_association.end()) {
- struct ThreadIdIdAssociation newAssociation;
- id = newAssociation.id = nextId();
- newAssociation.threadId = thread_id;
- _thread_id_id_association.push_back(newAssociation);
- pthread_cond_signal(&_thread_id_id_association_cond);
-
- } else {
- UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
- }
- pthread_mutex_unlock(&_thread_id_id_association_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
-
- //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
- return id;
- }
-
// Constructeur de la classe ThreadAdapter
- BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
- _bm(bm), _job(job)
+ BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
+ _bm(bm), _job(job), _id(id)
{
// Nothing to do
}
// On enregistre le fils dans la table des threads
pthread_t thread_id = pthread_self();
- Id id = _bm.registerThread_id(thread_id);
Parametre param = _job.getParametre();
Environnement env = _job.getEnvironnement();
- ostringstream thread_id_sst;
- thread_id_sst << id;
- param[ID] = thread_id_sst.str();
+ ostringstream id_sst;
+ id_sst << _id;
+ param[ID] = id_sst.str();
param[STATE] = Batch::RUNNING;
#ifndef WIN32
param[PID] = child;
#endif
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].thread_id = thread_id;
+ _bm._threads[_id].thread_id = thread_id;
#ifndef WIN32
- _bm._threads[id].pid = child;
+ _bm._threads[_id].pid = child;
#endif
- _bm._threads[id].status = RUNNING;
- _bm._threads[id].param = param;
- _bm._threads[id].env = env;
- _bm._threads[id].command_queue.push(NOP);
- pthread_mutex_unlock(&_bm._threads_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
-
+ _bm._threads[_id].status = RUNNING;
+ _bm._threads[_id].param = param;
+ _bm._threads[_id].env = env;
+ _bm._threads[_id].command_queue.push(NOP);
+ // Unlock the master thread. From here, all shared variables must be protected
+ // from concurrent access
+ pthread_cond_signal(&_bm._threadLaunchCondition);
// on boucle en attendant que le fils ait termine
BOOL res = GetExitCodeProcess(child, &exitCode);
if (exitCode != STILL_ACTIVE) {
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].status = DONE;
- _bm._threads[id].param[STATE] = Batch::FINISHED;
+ _bm._threads[_id].status = DONE;
+ _bm._threads[_id].param[STATE] = Batch::FINISHED;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
// Le fils est simplement stoppe
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].status = STOPPED;
- _bm._threads[id].param[STATE] = Batch::PAUSED;
+ _bm._threads[_id].status = STOPPED;
+ _bm._threads[_id].param[STATE] = Batch::PAUSED;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
// Le fils est termine, on sort de la boucle et du if englobant
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].status = DONE;
- _bm._threads[id].param[STATE] = Batch::FINISHED;
+ _bm._threads[_id].status = DONE;
+ _bm._threads[_id].param[STATE] = Batch::FINISHED;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
// Le fils a disparu ...
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[id].status = DEAD;
- _bm._threads[id].param[STATE] = Batch::FAILED;
+ _bm._threads[_id].status = DEAD;
+ _bm._threads[_id].param[STATE] = Batch::FAILED;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
// << "maxwalltime = " << maxwalltime << endl
// << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
- UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
+ UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
// On introduit une commande dans la queue du thread
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- if (_bm._threads.find(id) != _bm._threads.end())
- _bm._threads[id].command_queue.push(KILL);
+ if (_bm._threads.find(_id) != _bm._threads.end())
+ _bm._threads[_id].command_queue.push(KILL);
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
} else if (child_elapsedtime > maxwalltime ) {
- UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
+ UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
// On introduit une commande dans la queue du thread
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- if (_bm._threads.find(id) != _bm._threads.end())
- _bm._threads[id].command_queue.push(TERM);
+ if (_bm._threads.find(_id) != _bm._threads.end())
+ _bm._threads[_id].command_queue.push(TERM);
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
}
// On regarde s'il y a quelque chose a faire dans la queue de commande
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- if (_bm._threads.find(id) != _bm._threads.end()) {
- while (_bm._threads[id].command_queue.size() > 0) {
- Commande cmd = _bm._threads[id].command_queue.front();
- _bm._threads[id].command_queue.pop();
+ if (_bm._threads.find(_id) != _bm._threads.end()) {
+ while (_bm._threads[_id].command_queue.size() > 0) {
+ Commande cmd = _bm._threads[_id].command_queue.front();
+ _bm._threads[_id].command_queue.pop();
switch (cmd) {
case NOP: