_idCounter(0)
{
pthread_mutex_init(&_threads_mutex, NULL);
- pthread_cond_init(&_threadLaunchCondition, NULL);
+ pthread_cond_init(&_threadSyncCondition, NULL);
}
// Destructeur
BatchManager_Local::~BatchManager_Local()
{
+ for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
+ pthread_mutex_lock(&_threads_mutex);
+ string state = iter->second.param[STATE];
+ if (state != FINISHED && state != FAILED) {
+ UNDER_LOCK( cout << "Warning: Job " << iter->first <<
+ " is not finished, it will now be canceled." << endl );
+ pthread_cancel(iter->second.thread_id);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+ }
+ pthread_mutex_unlock(&_threads_mutex);
+ }
pthread_mutex_destroy(&_threads_mutex);
- pthread_cond_destroy(&_threadLaunchCondition);
+ pthread_cond_destroy(&_threadSyncCondition);
}
const CommunicationProtocol & BatchManager_Local::getProtocol() const
throw RunTimeException("Can't create new thread in BatchManager_Local");
}
- pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
pthread_mutex_unlock(&_threads_mutex);
ostringstream id_sst;
istringstream iss(jobid.getReference());
iss >> id;
- Status status;
-
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_threads_mutex);
- status = _threads[id].status;
+ bool running = (_threads[id].param[STATE].str() == RUNNING);
pthread_mutex_unlock(&_threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
//UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
- return (status == RUNNING);
+ return running;
}
// Methode de destruction d'un job
void BatchManager_Local::cancel(pthread_t thread_id)
{
+ pthread_mutex_lock(&_threads_mutex);
pthread_cancel(thread_id);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+ pthread_mutex_unlock(&_threads_mutex);
}
// Methode d'execution du thread
void * BatchManager_Local::ThreadAdapter::run(void * arg)
{
+ ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+
#ifndef WIN32
// On bloque tous les signaux pour ce thread
sigset_t setmask;
// Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
// sera prise en compte par pthread_testcancel()
Process child;
+ pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
- ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
-
-
-
// Le code retour cumule (ORed) de tous les appels
// Nul en cas de reussite de l'ensemble des operations
}
}
+ pthread_mutex_lock(&p_ta->_bm._threads_mutex);
+ // Set the job state to FINISHED or FAILED
+ p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
// On retire la fonction de nettoyage de la memoire
pthread_cleanup_pop(0);
// On retire la fonction de suppression du fils
pthread_cleanup_pop(0);
+ // remove setFailedOnCancel function from cancel stack
+ pthread_cleanup_pop(0);
+ pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
// On invoque la fonction de nettoyage de la memoire
delete_on_exit(arg);
#ifndef WIN32
_bm._threads[_id].pid = child;
#endif
- _bm._threads[_id].status = RUNNING;
+ _bm._threads[_id].hasFailed = false;
_bm._threads[_id].param = param;
_bm._threads[_id].env = env;
_bm._threads[_id].command_queue.push(NOP);
// Unlock the master thread. From here, all shared variables must be protected
// from concurrent access
- pthread_cond_signal(&_bm._threadLaunchCondition);
+ pthread_cond_signal(&_bm._threadSyncCondition);
// on boucle en attendant que le fils ait termine
DWORD exitCode;
BOOL res = GetExitCodeProcess(child, &exitCode);
if (exitCode != STILL_ACTIVE) {
- pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[_id].status = DONE;
- _bm._threads[_id].param[STATE] = Batch::FINISHED;
- pthread_mutex_unlock(&_bm._threads_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
break;
}
// Le fils est simplement stoppe
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[_id].status = STOPPED;
_bm._threads[_id].param[STATE] = Batch::PAUSED;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
}
else {
// Le fils est termine, on sort de la boucle et du if englobant
- // @@@ --------> SECTION CRITIQUE <-------- @@@
- pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[_id].status = DONE;
- _bm._threads[_id].param[STATE] = Batch::FINISHED;
- pthread_mutex_unlock(&_bm._threads_mutex);
- // @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
break;
}
// Le fils a disparu ...
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_bm._threads_mutex);
- _bm._threads[_id].status = DEAD;
- _bm._threads[_id].param[STATE] = Batch::FAILED;
+ _bm._threads[_id].hasFailed = true;
pthread_mutex_unlock(&_bm._threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
delete p_ta;
}
+ void BatchManager_Local::setFailedOnCancel(void * arg)
+ {
+ ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
+ pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
+ p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
+ pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
+
+ // Unlock the master thread. From here, the batch manager instance should not be used.
+ pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);
+ }
+
}