-// Copyright (C) 2007-2010 CEA/DEN, EDF R&D, OPEN CASCADE
+// Copyright (C) 2007-2012 CEA/DEN, EDF R&D, OPEN CASCADE
//
// Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
// CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
#include <signal.h>
#include <errno.h>
#include <string.h>
+
+#include "Batch_Constants.hxx"
#include "Batch_IOMutex.hxx"
#include "Batch_BatchManager_Local.hxx"
#include "Batch_RunTimeException.hxx"
istringstream iss(jobid.getReference());
iss >> id;
- // On retrouve le thread_id du thread
- pthread_t thread_id;
-
// @@@ --------> SECTION CRITIQUE <-------- @@@
pthread_mutex_lock(&_threads_mutex);
- if (_threads.find(id) != _threads.end())
- thread_id = _threads[id].thread_id;
+ bool idFound = (_threads.find(id) != _threads.end());
+ if (idFound) {
+ string state = _threads[id].param[STATE];
+ if (state != FINISHED && state != FAILED) {
+ pthread_cancel(_threads[id].thread_id);
+ pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
+ } else {
+ cout << "Cannot delete job " << jobid.getReference() <<
+ ". Job is already finished." << endl;
+ }
+ }
pthread_mutex_unlock(&_threads_mutex);
// @@@ --------> SECTION CRITIQUE <-------- @@@
- cancel(thread_id);
+ if (!idFound)
+ throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
}
// Methode pour le controle des jobs : suspend un job en file d'attente
}
+ // Ce manager ne peut pas reprendre un job
+ // On force donc l'état du job à erreur - pour cela on ne donne pas d'Id
+ // au JobId
+ const Batch::JobId
+ BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
+ {
+ return JobId(this, "undefined");
+ }
// Methode pour le controle des jobs : teste si un job est present en machine
bool BatchManager_Local::isRunning(const JobId & jobid)
return running;
}
- // Methode de destruction d'un job
- void BatchManager_Local::cancel(pthread_t thread_id)
- {
- pthread_mutex_lock(&_threads_mutex);
- pthread_cancel(thread_id);
- pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
- pthread_mutex_unlock(&_threads_mutex);
- }
-
vector<string> BatchManager_Local::exec_command(const Parametre & param) const
{
// Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
// sera prise en compte par pthread_testcancel()
Process child;
+ pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
- pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
// Le code retour cumule (ORed) de tous les appels
while (1) {
#ifdef WIN32
DWORD exitCode;
- BOOL res = GetExitCodeProcess(child, &exitCode);
+ GetExitCodeProcess(child, &exitCode);
if (exitCode != STILL_ACTIVE) {
UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
break;
// On regarde si le fils n'a pas depasse son temps (wallclock time)
time_t child_currenttime = time(NULL);
- time_t child_elapsedtime = child_currenttime - child_starttime;
+ long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
if (param.find(MAXWALLTIME) != param.end()) {
- int maxwalltime = param[MAXWALLTIME];
+ long maxwalltime = param[MAXWALLTIME];
// cout << "child_starttime = " << child_starttime << endl
// << "child_currenttime = " << child_currenttime << endl
// << "child_elapsedtime = " << child_elapsedtime << endl
// << "maxwalltime = " << maxwalltime << endl
// << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
- if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
+ if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
// On introduit une commande dans la queue du thread
// @@@ --------> SECTION CRITIQUE <-------- @@@
// @@@ --------> SECTION CRITIQUE <-------- @@@
- } else if (child_elapsedtime > maxwalltime ) {
+ } else if (child_elapsedtime_minutes > maxwalltime ) {
UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
// On introduit une commande dans la queue du thread
// @@@ --------> SECTION CRITIQUE <-------- @@@
// On positionne les limites systeme imposees au fils
+ // This part is deactivated because those limits should be set on the job process, not on
+ // the ssh process. If it is done properly one day, beware of the types used (int is not enough)
+ /*
if (param.find(MAXCPUTIME) != param.end()) {
int maxcputime = param[MAXCPUTIME];
struct rlimit limit;
if (param.find(MAXRAMSIZE) != param.end()) {
int maxramsize = param[MAXRAMSIZE];
struct rlimit limit;
- limit.rlim_cur = maxramsize * 1024;
- limit.rlim_max = int(maxramsize * 1.1) * 1024;
+ limit.rlim_cur = maxramsize * 1024 * 1024;
+ limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
setrlimit(RLIMIT_AS, &limit);
}
+ */
//char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
// On execute la commande du fils
- int result = execve(argv[0], argv, envp);
+ execve(argv[0], argv, envp);
UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
// No need to deallocate since nothing happens after a successful exec
char * buffer = strdup(comstr.c_str());
// launch the new process
- BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
+ bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
if (buffer) free(buffer);