1 // Copyright (C) 2007-2012 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 * BatchManager_Local.cxx :
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail : mailto:ivan.dutka-malen@der.edf.fr
27 * Date : Thu Nov 6 10:17:22 2003
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
41 #include <sys/types.h>
54 #include "Batch_Constants.hxx"
55 #include "Batch_IOMutex.hxx"
56 #include "Batch_BatchManager_Local.hxx"
57 #include "Batch_RunTimeException.hxx"
65 BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
66 const char * username,
67 CommunicationProtocolType protocolType, const char * mpiImpl,
69 : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
72 pthread_mutex_init(&_threads_mutex, NULL);
73 pthread_cond_init(&_threadSyncCondition, NULL);
77 BatchManager_Local::~BatchManager_Local()
79 for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
80 pthread_mutex_lock(&_threads_mutex);
81 string state = iter->second.param[STATE];
82 if (state != FINISHED && state != FAILED) {
83 UNDER_LOCK( cout << "Warning: Job " << iter->first <<
84 " is not finished, it will now be canceled." << endl );
85 pthread_cancel(iter->second.thread_id);
86 pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
88 pthread_mutex_unlock(&_threads_mutex);
90 pthread_mutex_destroy(&_threads_mutex);
91 pthread_cond_destroy(&_threadSyncCondition);
94 // Methode pour le controle des jobs : soumet un job au gestionnaire
95 const JobId BatchManager_Local::submitJob(const Job & job)
97 // export input files in the working directory of the execution host
98 exportInputFiles(job);
100 Job_Local jobLocal = job;
101 Id id = _idCounter++;
102 ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
104 // Les attributs du thread a sa creation
105 pthread_attr_t thread_attr;
106 pthread_attr_init(&thread_attr);
107 pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
109 // Creation du thread qui va executer la commande systeme qu'on lui passe
111 pthread_mutex_lock(&_threads_mutex);
112 int rc = pthread_create(&thread_id,
115 static_cast<void *>(p_ta));
117 // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
118 pthread_attr_destroy(&thread_attr);
121 pthread_mutex_unlock(&_threads_mutex);
122 throw RunTimeException("Can't create new thread in BatchManager_Local");
125 pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
126 pthread_mutex_unlock(&_threads_mutex);
128 ostringstream id_sst;
130 return JobId(this, id_sst.str());
133 // Methode pour le controle des jobs : retire un job du gestionnaire
134 void BatchManager_Local::deleteJob(const JobId & jobid)
138 istringstream iss(jobid.getReference());
141 // @@@ --------> SECTION CRITIQUE <-------- @@@
142 pthread_mutex_lock(&_threads_mutex);
143 bool idFound = (_threads.find(id) != _threads.end());
145 string state = _threads[id].param[STATE];
146 if (state != FINISHED && state != FAILED) {
147 pthread_cancel(_threads[id].thread_id);
148 pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
150 cout << "Cannot delete job " << jobid.getReference() <<
151 ". Job is already finished." << endl;
154 pthread_mutex_unlock(&_threads_mutex);
155 // @@@ --------> SECTION CRITIQUE <-------- @@@
158 throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
161 // Methode pour le controle des jobs : suspend un job en file d'attente
162 void BatchManager_Local::holdJob(const JobId & jobid)
165 istringstream iss(jobid.getReference());
168 UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
170 // On introduit une commande dans la queue du thread
171 // @@@ --------> SECTION CRITIQUE <-------- @@@
172 pthread_mutex_lock(&_threads_mutex);
173 if (_threads.find(id) != _threads.end())
174 _threads[id].command_queue.push(HOLD);
175 pthread_mutex_unlock(&_threads_mutex);
176 // @@@ --------> SECTION CRITIQUE <-------- @@@
179 // Methode pour le controle des jobs : relache un job suspendu
180 void BatchManager_Local::releaseJob(const JobId & jobid)
183 istringstream iss(jobid.getReference());
186 UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
188 // On introduit une commande dans la queue du thread
189 // @@@ --------> SECTION CRITIQUE <-------- @@@
190 pthread_mutex_lock(&_threads_mutex);
191 if (_threads.find(id) != _threads.end())
192 _threads[id].command_queue.push(RELEASE);
193 pthread_mutex_unlock(&_threads_mutex);
194 // @@@ --------> SECTION CRITIQUE <-------- @@@
198 // Methode pour le controle des jobs : modifie un job en file d'attente
199 void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
203 // Methode pour le controle des jobs : modifie un job en file d'attente
204 void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
206 alterJob(jobid, param, Environnement());
209 // Methode pour le controle des jobs : modifie un job en file d'attente
210 void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
212 alterJob(jobid, Parametre(), env);
217 // Methode pour le controle des jobs : renvoie l'etat du job
218 JobInfo BatchManager_Local::queryJob(const JobId & jobid)
221 istringstream iss(jobid.getReference());
227 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
228 // @@@ --------> SECTION CRITIQUE <-------- @@@
229 pthread_mutex_lock(&_threads_mutex);
230 std::map<Id, Child >::iterator pos = _threads.find(id);
231 bool found = (pos != _threads.end());
233 param = pos->second.param;
234 env = pos->second.env;
236 pthread_mutex_unlock(&_threads_mutex);
237 // @@@ --------> SECTION CRITIQUE <-------- @@@
238 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
240 if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
242 JobInfo_Local ji(param, env);
247 // Ce manager ne peut pas reprendre un job
248 // On force donc l'état du job à erreur - pour cela on ne donne pas d'Id
251 BatchManager_Local::addJob(const Batch::Job & job, const std::string reference)
253 return JobId(this, "undefined");
256 // Methode pour le controle des jobs : teste si un job est present en machine
257 bool BatchManager_Local::isRunning(const JobId & jobid)
260 istringstream iss(jobid.getReference());
263 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
264 // @@@ --------> SECTION CRITIQUE <-------- @@@
265 pthread_mutex_lock(&_threads_mutex);
266 bool running = (_threads[id].param[STATE].str() == RUNNING);
267 pthread_mutex_unlock(&_threads_mutex);
268 // @@@ --------> SECTION CRITIQUE <-------- @@@
269 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
275 vector<string> BatchManager_Local::exec_command(const Parametre & param) const
277 ostringstream exec_sub_cmd;
280 char drive[_MAX_DRIVE];
281 _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
282 if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
285 string fileToExecute = param[EXECUTABLE].str();
286 string::size_type p1 = fileToExecute.find_last_of("/");
287 string fileNameToExecute = fileToExecute.substr(p1+1);
289 exec_sub_cmd << "cd " << param[WORKDIR] << " && ./" << fileNameToExecute;
291 if (param.find(ARGUMENTS) != param.end()) {
292 Versatile V = param[ARGUMENTS];
293 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
294 StringType argt = * static_cast<StringType *>(*it);
296 exec_sub_cmd << " " << arg;
300 if (param.find(INFILE) != param.end()) {
301 Versatile V = param[INFILE];
302 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
303 Couple cpl = * static_cast<CoupleType*>(*it);
304 string remote = cpl.getRemote();
305 if (remote == "stdin")
306 exec_sub_cmd << " <stdin";
310 if (param.find(OUTFILE) != param.end()) {
311 Versatile V = param[OUTFILE];
312 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
313 Couple cpl = * static_cast<CoupleType*>(*it);
314 string remote = cpl.getRemote();
315 if (remote == "stdout") exec_sub_cmd << " 1>stdout";
316 if (remote == "stderr") exec_sub_cmd << " 2>stderr";
321 Parametre::const_iterator it = param.find(USER);
322 if (it != param.end()) {
323 user = string(it->second);
326 return _protocol.getExecCommandArgs(exec_sub_cmd.str(), _hostname, user);
331 // Constructeur de la classe ThreadAdapter
332 BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
333 _bm(bm), _job(job), _id(id)
340 // Methode d'execution du thread
341 void * BatchManager_Local::ThreadAdapter::run(void * arg)
343 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
346 // On bloque tous les signaux pour ce thread
348 sigfillset(&setmask);
349 pthread_sigmask(SIG_BLOCK, &setmask, NULL);
352 // On autorise la terminaison differee du thread
353 // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
354 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
355 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
357 // On enregistre la fonction de suppression du fils en cas d'arret du thread
358 // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
359 // sera prise en compte par pthread_testcancel()
361 pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
362 pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
363 pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
365 // On forke/exec un nouveau process pour pouvoir controler le fils
366 // (plus finement qu'avec un appel system)
367 // int rc = system(commande.c_str());
368 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
369 //execv("/usr/bin/ssh", parmList);
371 child = p_ta->launchWin32ChildProcess();
375 if (child < 0) { // erreur
376 UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
378 } else if (child > 0) { // pere
386 pthread_mutex_lock(&p_ta->_bm._threads_mutex);
388 // Set the job state to FINISHED or FAILED
389 p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
391 // On retire la fonction de nettoyage de la memoire
392 pthread_cleanup_pop(0);
394 // On retire la fonction de suppression du fils
395 pthread_cleanup_pop(0);
397 // remove setFailedOnCancel function from cancel stack
398 pthread_cleanup_pop(0);
400 pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
402 // On invoque la fonction de nettoyage de la memoire
405 UNDER_LOCK( cout << "Father is leaving" << endl );
413 void BatchManager_Local::ThreadAdapter::pere(Process child)
415 time_t child_starttime = time(NULL);
417 // On enregistre le fils dans la table des threads
418 pthread_t thread_id = pthread_self();
420 Parametre param = _job.getParametre();
421 Environnement env = _job.getEnvironnement();
423 ostringstream id_sst;
425 param[ID] = id_sst.str();
426 param[STATE] = Batch::RUNNING;
431 _bm._threads[_id].thread_id = thread_id;
433 _bm._threads[_id].pid = child;
435 _bm._threads[_id].hasFailed = false;
436 _bm._threads[_id].param = param;
437 _bm._threads[_id].env = env;
438 _bm._threads[_id].command_queue.push(NOP);
440 // Unlock the master thread. From here, all shared variables must be protected
441 // from concurrent access
442 pthread_cond_signal(&_bm._threadSyncCondition);
445 // on boucle en attendant que le fils ait termine
449 GetExitCodeProcess(child, &exitCode);
450 if (exitCode != STILL_ACTIVE) {
451 UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
456 pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
457 if (child_wait_rc > 0) {
458 UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
459 UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
460 UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
461 UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
462 UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
463 UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
464 UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
466 UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
468 if (WIFSTOPPED(child_rc)) {
469 // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
470 // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
471 // desactive car s'il est possible de detecter l'arret d'un process, il est
472 // plus difficile de detecter sa reprise.
474 // Le fils est simplement stoppe
475 // @@@ --------> SECTION CRITIQUE <-------- @@@
476 pthread_mutex_lock(&_bm._threads_mutex);
477 _bm._threads[_id].param[STATE] = Batch::PAUSED;
478 pthread_mutex_unlock(&_bm._threads_mutex);
479 // @@@ --------> SECTION CRITIQUE <-------- @@@
480 UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
484 // Le fils est termine, on sort de la boucle et du if englobant
485 UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
489 else if (child_wait_rc == -1) {
490 // Le fils a disparu ...
491 // @@@ --------> SECTION CRITIQUE <-------- @@@
492 pthread_mutex_lock(&_bm._threads_mutex);
493 _bm._threads[_id].hasFailed = true;
494 pthread_mutex_unlock(&_bm._threads_mutex);
495 // @@@ --------> SECTION CRITIQUE <-------- @@@
496 UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
501 // On teste si le thread doit etre detruit
502 pthread_testcancel();
506 // On regarde si le fils n'a pas depasse son temps (wallclock time)
507 time_t child_currenttime = time(NULL);
508 long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
509 if (param.find(MAXWALLTIME) != param.end()) {
510 long maxwalltime = param[MAXWALLTIME];
511 // cout << "child_starttime = " << child_starttime << endl
512 // << "child_currenttime = " << child_currenttime << endl
513 // << "child_elapsedtime = " << child_elapsedtime << endl
514 // << "maxwalltime = " << maxwalltime << endl
515 // << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
516 if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
517 UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
518 // On introduit une commande dans la queue du thread
519 // @@@ --------> SECTION CRITIQUE <-------- @@@
520 pthread_mutex_lock(&_bm._threads_mutex);
521 if (_bm._threads.find(_id) != _bm._threads.end())
522 _bm._threads[_id].command_queue.push(KILL);
523 pthread_mutex_unlock(&_bm._threads_mutex);
524 // @@@ --------> SECTION CRITIQUE <-------- @@@
527 } else if (child_elapsedtime_minutes > maxwalltime ) {
528 UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
529 // On introduit une commande dans la queue du thread
530 // @@@ --------> SECTION CRITIQUE <-------- @@@
531 pthread_mutex_lock(&_bm._threads_mutex);
532 if (_bm._threads.find(_id) != _bm._threads.end())
533 _bm._threads[_id].command_queue.push(TERM);
534 pthread_mutex_unlock(&_bm._threads_mutex);
535 // @@@ --------> SECTION CRITIQUE <-------- @@@
541 // On regarde s'il y a quelque chose a faire dans la queue de commande
542 // @@@ --------> SECTION CRITIQUE <-------- @@@
543 pthread_mutex_lock(&_bm._threads_mutex);
544 if (_bm._threads.find(_id) != _bm._threads.end()) {
545 while (_bm._threads[_id].command_queue.size() > 0) {
546 Commande cmd = _bm._threads[_id].command_queue.front();
547 _bm._threads[_id].command_queue.pop();
551 UNDER_LOCK( cout << "Father does nothing to his child" << endl );
555 UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
556 kill(child, SIGSTOP);
560 UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
561 kill(child, SIGCONT);
565 UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
566 kill(child, SIGTERM);
570 UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
571 kill(child, SIGKILL);
583 pthread_mutex_unlock(&_bm._threads_mutex);
584 // @@@ --------> SECTION CRITIQUE <-------- @@@
586 // On fait une petite pause pour ne pas surcharger inutilement le processeur
601 void BatchManager_Local::ThreadAdapter::fils()
603 Parametre param = _job.getParametre();
604 Parametre::iterator it;
606 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
607 //int result = execv("/usr/bin/ssh", parmList);
608 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
609 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
612 // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
613 vector<string> command;
614 if (param.find(EXECUTABLE) != param.end()) {
615 command = _bm.exec_command(param);
618 // Build the argument array argv from the command
619 char ** argv = new char * [command.size() + 1];
621 for (string::size_type i=0 ; i<command.size() ; i++) {
622 argv[i] = new char[command[i].size() + 1];
623 strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
624 if (i>0) comstr += " # ";
625 comstr += command[i];
628 argv[command.size()] = NULL;
630 UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
631 UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
633 // Create the environment for the new process. Note (RB): Here we change the environment for
634 // the process launched in local. It would seem more logical to set the environment for the
636 Environnement env = _job.getEnvironnement();
640 envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
642 for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
643 const string & key = (*it).first;
644 const string & value = (*it).second;
646 oss << key << "=" << value;
647 envp[i] = new char [oss.str().size() + 1];
648 strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
651 // assert (i == env.size())
655 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
656 //int result = execv("/usr/bin/ssh", parmList);
657 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
658 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
662 // On positionne les limites systeme imposees au fils
663 // This part is deactivated because those limits should be set on the job process, not on
664 // the ssh process. If it is done properly one day, beware of the types used (int is not enough)
666 if (param.find(MAXCPUTIME) != param.end()) {
667 int maxcputime = param[MAXCPUTIME];
669 limit.rlim_cur = maxcputime;
670 limit.rlim_max = int(maxcputime * 1.1);
671 setrlimit(RLIMIT_CPU, &limit);
674 if (param.find(MAXDISKSIZE) != param.end()) {
675 int maxdisksize = param[MAXDISKSIZE];
677 limit.rlim_cur = maxdisksize * 1024;
678 limit.rlim_max = int(maxdisksize * 1.1) * 1024;
679 setrlimit(RLIMIT_FSIZE, &limit);
682 if (param.find(MAXRAMSIZE) != param.end()) {
683 int maxramsize = param[MAXRAMSIZE];
685 limit.rlim_cur = maxramsize * 1024 * 1024;
686 limit.rlim_max = int(maxramsize * 1.1) * 1024 * 1024;
687 setrlimit(RLIMIT_AS, &limit);
692 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
693 //int result = execv("/usr/bin/ssh", parmList);
694 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
695 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
697 // On cree une session pour le fils de facon a ce qu'il ne soit pas
698 // detruit lorsque le shell se termine (le shell ouvre une session et
699 // tue tous les process appartenant a la session en quittant)
703 // On ferme les descripteurs de fichiers standards
704 //close(STDIN_FILENO);
705 //close(STDOUT_FILENO);
706 //close(STDERR_FILENO);
709 // On execute la commande du fils
710 execve(argv[0], argv, envp);
711 UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
712 // No need to deallocate since nothing happens after a successful exec
714 // Normalement on ne devrait jamais arriver ici
715 ofstream file_err("error.log");
716 UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
718 } catch (GenericException & e) {
720 std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
728 BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
730 Parametre param = _job.getParametre();
731 Parametre::iterator it;
732 PROCESS_INFORMATION pi;
736 // EXECUTABLE is MANDATORY, if missing, we throw an exception
737 vector<string> exec_command;
738 if (param.find(EXECUTABLE) != param.end()) {
739 exec_command = _bm.exec_command(param);
741 throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
744 // Build the command string from the command argument vector
746 for (unsigned int i=0 ; i<exec_command.size() ; i++) {
747 if (i>0) comstr += " ";
748 comstr += exec_command[i];
751 UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
753 // Create the environment for the new process. Note (RB): Here we change the environment for
754 // the process launched in local. It would seem more logical to set the environment for the
756 // Note that if no environment is specified, we reuse the current environment.
757 Environnement env = _job.getEnvironnement();
758 char * chNewEnv = NULL;
761 chNewEnv = new char[4096];
762 LPTSTR lpszCurrentVariable = chNewEnv;
763 for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
764 const string & key = (*it).first;
765 const string & value = (*it).second;
766 string envvar = key + "=" + value;
767 envvar.copy(lpszCurrentVariable, envvar.size());
768 lpszCurrentVariable[envvar.size()] = '\0';
769 lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
771 // Terminate the block with a NULL byte.
772 *lpszCurrentVariable = '\0';
777 ZeroMemory( &si, sizeof(si) );
779 ZeroMemory( &pi, sizeof(pi) );
781 // Copy the command to a non-const buffer
782 char * buffer = strdup(comstr.c_str());
784 // launch the new process
785 bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
786 CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
788 if (buffer) free(buffer);
789 if (!res) throw RunTimeException("Error while creating new process");
791 CloseHandle(pi.hThread);
793 } catch (GenericException & e) {
795 std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
804 void BatchManager_Local::kill_child_on_exit(void * p_pid)
807 //TODO: porting of following functionality
808 pid_t child = * static_cast<pid_t *>(p_pid);
811 kill(child, SIGTERM);
813 // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
814 // mais cette option n'est pas implementee pour le moment, car il est
815 // preferable de laisser le process fils se terminer normalement et seul.
819 void BatchManager_Local::delete_on_exit(void * arg)
821 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
825 void BatchManager_Local::setFailedOnCancel(void * arg)
827 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
828 pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
829 p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
830 pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
832 // Unlock the master thread. From here, the batch manager instance should not be used.
833 pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);