1 // Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 * BatchManager_Local.cxx :
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail : mailto:ivan.dutka-malen@der.edf.fr
27 * Date : Thu Nov 6 10:17:22 2003
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
41 #include <sys/types.h>
53 #include "Batch_IOMutex.hxx"
54 #include "Batch_BatchManager_Local.hxx"
55 #include "Batch_RunTimeException.hxx"
63 BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64 CommunicationProtocolType protocolType)
65 : BatchManager(parent, host), _connect(0),
66 _protocol(CommunicationProtocol::getInstance(protocolType)),
69 pthread_mutex_init(&_threads_mutex, NULL);
70 pthread_cond_init(&_threadLaunchCondition, NULL);
74 BatchManager_Local::~BatchManager_Local()
76 pthread_mutex_destroy(&_threads_mutex);
77 pthread_cond_destroy(&_threadLaunchCondition);
80 const CommunicationProtocol & BatchManager_Local::getProtocol() const
85 // Methode pour le controle des jobs : soumet un job au gestionnaire
86 const JobId BatchManager_Local::submitJob(const Job & job)
88 Job_Local jobLocal = job;
90 ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
92 // Les attributs du thread a sa creation
93 pthread_attr_t thread_attr;
94 pthread_attr_init(&thread_attr);
95 pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
97 // Creation du thread qui va executer la commande systeme qu'on lui passe
99 pthread_mutex_lock(&_threads_mutex);
100 int rc = pthread_create(&thread_id,
103 static_cast<void *>(p_ta));
105 // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
106 pthread_attr_destroy(&thread_attr);
109 pthread_mutex_unlock(&_threads_mutex);
110 throw RunTimeException("Can't create new thread in BatchManager_Local");
113 pthread_cond_wait(&_threadLaunchCondition, &_threads_mutex);
114 pthread_mutex_unlock(&_threads_mutex);
116 ostringstream id_sst;
118 return JobId(this, id_sst.str());
121 // Methode pour le controle des jobs : retire un job du gestionnaire
122 void BatchManager_Local::deleteJob(const JobId & jobid)
126 istringstream iss(jobid.getReference());
129 // On retrouve le thread_id du thread
132 // @@@ --------> SECTION CRITIQUE <-------- @@@
133 pthread_mutex_lock(&_threads_mutex);
134 if (_threads.find(id) != _threads.end())
135 thread_id = _threads[id].thread_id;
136 pthread_mutex_unlock(&_threads_mutex);
137 // @@@ --------> SECTION CRITIQUE <-------- @@@
142 // Methode pour le controle des jobs : suspend un job en file d'attente
143 void BatchManager_Local::holdJob(const JobId & jobid)
146 istringstream iss(jobid.getReference());
149 UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
151 // On introduit une commande dans la queue du thread
152 // @@@ --------> SECTION CRITIQUE <-------- @@@
153 pthread_mutex_lock(&_threads_mutex);
154 if (_threads.find(id) != _threads.end())
155 _threads[id].command_queue.push(HOLD);
156 pthread_mutex_unlock(&_threads_mutex);
157 // @@@ --------> SECTION CRITIQUE <-------- @@@
160 // Methode pour le controle des jobs : relache un job suspendu
161 void BatchManager_Local::releaseJob(const JobId & jobid)
164 istringstream iss(jobid.getReference());
167 UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
169 // On introduit une commande dans la queue du thread
170 // @@@ --------> SECTION CRITIQUE <-------- @@@
171 pthread_mutex_lock(&_threads_mutex);
172 if (_threads.find(id) != _threads.end())
173 _threads[id].command_queue.push(RELEASE);
174 pthread_mutex_unlock(&_threads_mutex);
175 // @@@ --------> SECTION CRITIQUE <-------- @@@
179 // Methode pour le controle des jobs : modifie un job en file d'attente
180 void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
184 // Methode pour le controle des jobs : modifie un job en file d'attente
185 void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
187 alterJob(jobid, param, Environnement());
190 // Methode pour le controle des jobs : modifie un job en file d'attente
191 void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
193 alterJob(jobid, Parametre(), env);
198 // Methode pour le controle des jobs : renvoie l'etat du job
199 JobInfo BatchManager_Local::queryJob(const JobId & jobid)
202 istringstream iss(jobid.getReference());
208 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
209 // @@@ --------> SECTION CRITIQUE <-------- @@@
210 pthread_mutex_lock(&_threads_mutex);
211 std::map<Id, Child >::iterator pos = _threads.find(id);
212 bool found = (pos != _threads.end());
214 param = pos->second.param;
215 env = pos->second.env;
217 pthread_mutex_unlock(&_threads_mutex);
218 // @@@ --------> SECTION CRITIQUE <-------- @@@
219 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
221 if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
223 JobInfo_Local ji(param, env);
229 // Methode pour le controle des jobs : teste si un job est present en machine
230 bool BatchManager_Local::isRunning(const JobId & jobid)
233 istringstream iss(jobid.getReference());
238 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
239 // @@@ --------> SECTION CRITIQUE <-------- @@@
240 pthread_mutex_lock(&_threads_mutex);
241 status = _threads[id].status;
242 pthread_mutex_unlock(&_threads_mutex);
243 // @@@ --------> SECTION CRITIQUE <-------- @@@
244 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
246 return (status == RUNNING);
249 // Methode de destruction d'un job
250 void BatchManager_Local::cancel(pthread_t thread_id)
252 pthread_cancel(thread_id);
256 vector<string> BatchManager_Local::exec_command(const Parametre & param) const
258 ostringstream exec_sub_cmd;
261 char drive[_MAX_DRIVE];
262 _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
263 if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
266 exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
268 if (param.find(ARGUMENTS) != param.end()) {
269 Versatile V = param[ARGUMENTS];
270 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
271 StringType argt = * static_cast<StringType *>(*it);
273 exec_sub_cmd << " " << arg;
277 if (param.find(INFILE) != param.end()) {
278 Versatile V = param[INFILE];
279 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
280 Couple cpl = * static_cast<CoupleType*>(*it);
281 string remote = cpl.getRemote();
282 if (remote == "stdin")
283 exec_sub_cmd << " <stdin";
287 if (param.find(OUTFILE) != param.end()) {
288 Versatile V = param[OUTFILE];
289 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
290 Couple cpl = * static_cast<CoupleType*>(*it);
291 string remote = cpl.getRemote();
292 if (remote == "stdout") exec_sub_cmd << " 1>stdout";
293 if (remote == "stderr") exec_sub_cmd << " 2>stderr";
298 Parametre::const_iterator it = param.find(USER);
299 if (it != param.end()) {
300 user = string(it->second);
303 return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
308 // Constructeur de la classe ThreadAdapter
309 BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
310 _bm(bm), _job(job), _id(id)
317 // Methode d'execution du thread
318 void * BatchManager_Local::ThreadAdapter::run(void * arg)
321 // On bloque tous les signaux pour ce thread
323 sigfillset(&setmask);
324 pthread_sigmask(SIG_BLOCK, &setmask, NULL);
327 // On autorise la terminaison differee du thread
328 // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
329 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
330 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
332 // On enregistre la fonction de suppression du fils en cas d'arret du thread
333 // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
334 // sera prise en compte par pthread_testcancel()
336 pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
337 pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
339 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
344 // Le code retour cumule (ORed) de tous les appels
345 // Nul en cas de reussite de l'ensemble des operations
348 // Cette table contient la liste des fichiers a detruire a la fin du processus
349 std::vector<string> files_to_delete;
353 // On copie les fichiers d'entree pour le fils
354 const Parametre param = p_ta->_job.getParametre();
355 Parametre::const_iterator it;
357 // On initialise la variable workdir a la valeur du Current Working Directory
363 getcwd(cwd, PATH_MAX);
365 string workdir = cwd;
368 if ( (it = param.find(WORKDIR)) != param.end() ) {
369 workdir = static_cast<string>( (*it).second );
372 string executionhost = string(param[EXECUTIONHOST]);
374 if ( (it = param.find(USER)) != param.end() ) {
375 user = string(it->second);
378 if ( (it = param.find(INFILE)) != param.end() ) {
379 Versatile V = (*it).second;
380 Versatile::iterator Vit;
382 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
383 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
385 string local = cp.getLocal();
386 string remote = cp.getRemote();
388 std::cerr << workdir << std::endl;
389 std::cerr << remote << std::endl;
391 int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
392 workdir + "/" + remote,
393 executionhost, user);
398 // On enregistre le fichier comme etant a detruire
399 files_to_delete.push_back(workdir + "/" + remote);
408 // On forke/exec un nouveau process pour pouvoir controler le fils
409 // (plus finement qu'avec un appel system)
410 // int rc = system(commande.c_str());
411 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
412 //execv("/usr/bin/ssh", parmList);
414 child = p_ta->launchWin32ChildProcess();
418 if (child < 0) { // erreur
419 UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
421 } else if (child > 0) { // pere
430 // On copie les fichiers de sortie du fils
431 if ( (it = param.find(OUTFILE)) != param.end() ) {
432 Versatile V = (*it).second;
433 Versatile::iterator Vit;
435 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
436 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
438 string local = cp.getLocal();
439 string remote = cp.getRemote();
441 int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
448 // On enregistre le fichier comme etant a detruire
449 files_to_delete.push_back(workdir + "/" + remote);
455 // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
456 // ou si la creation du fils n'a pu avoir lieu
457 if ( (rc == 0) || (child < 0) ) {
458 std::vector<string>::const_iterator it;
459 for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
460 p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
461 /* string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
462 UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
464 remove_cmd = string("\"") + remove_cmd + string("\"");
466 system(remove_cmd.c_str());*/
472 // On retire la fonction de nettoyage de la memoire
473 pthread_cleanup_pop(0);
475 // On retire la fonction de suppression du fils
476 pthread_cleanup_pop(0);
480 // On invoque la fonction de nettoyage de la memoire
483 UNDER_LOCK( cout << "Father is leaving" << endl );
491 void BatchManager_Local::ThreadAdapter::pere(Process child)
493 time_t child_starttime = time(NULL);
495 // On enregistre le fils dans la table des threads
496 pthread_t thread_id = pthread_self();
498 Parametre param = _job.getParametre();
499 Environnement env = _job.getEnvironnement();
501 ostringstream id_sst;
503 param[ID] = id_sst.str();
504 param[STATE] = Batch::RUNNING;
509 _bm._threads[_id].thread_id = thread_id;
511 _bm._threads[_id].pid = child;
513 _bm._threads[_id].status = RUNNING;
514 _bm._threads[_id].param = param;
515 _bm._threads[_id].env = env;
516 _bm._threads[_id].command_queue.push(NOP);
518 // Unlock the master thread. From here, all shared variables must be protected
519 // from concurrent access
520 pthread_cond_signal(&_bm._threadLaunchCondition);
523 // on boucle en attendant que le fils ait termine
527 BOOL res = GetExitCodeProcess(child, &exitCode);
528 if (exitCode != STILL_ACTIVE) {
529 pthread_mutex_lock(&_bm._threads_mutex);
530 _bm._threads[_id].status = DONE;
531 _bm._threads[_id].param[STATE] = Batch::FINISHED;
532 pthread_mutex_unlock(&_bm._threads_mutex);
533 // @@@ --------> SECTION CRITIQUE <-------- @@@
534 UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
539 pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
540 if (child_wait_rc > 0) {
541 UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
542 UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
543 UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
544 UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
545 UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
546 UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
547 UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
549 UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
551 if (WIFSTOPPED(child_rc)) {
552 // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
553 // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
554 // desactive car s'il est possible de detecter l'arret d'un process, il est
555 // plus difficile de detecter sa reprise.
557 // Le fils est simplement stoppe
558 // @@@ --------> SECTION CRITIQUE <-------- @@@
559 pthread_mutex_lock(&_bm._threads_mutex);
560 _bm._threads[_id].status = STOPPED;
561 _bm._threads[_id].param[STATE] = Batch::PAUSED;
562 pthread_mutex_unlock(&_bm._threads_mutex);
563 // @@@ --------> SECTION CRITIQUE <-------- @@@
564 UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
568 // Le fils est termine, on sort de la boucle et du if englobant
569 // @@@ --------> SECTION CRITIQUE <-------- @@@
570 pthread_mutex_lock(&_bm._threads_mutex);
571 _bm._threads[_id].status = DONE;
572 _bm._threads[_id].param[STATE] = Batch::FINISHED;
573 pthread_mutex_unlock(&_bm._threads_mutex);
574 // @@@ --------> SECTION CRITIQUE <-------- @@@
575 UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
579 else if (child_wait_rc == -1) {
580 // Le fils a disparu ...
581 // @@@ --------> SECTION CRITIQUE <-------- @@@
582 pthread_mutex_lock(&_bm._threads_mutex);
583 _bm._threads[_id].status = DEAD;
584 _bm._threads[_id].param[STATE] = Batch::FAILED;
585 pthread_mutex_unlock(&_bm._threads_mutex);
586 // @@@ --------> SECTION CRITIQUE <-------- @@@
587 UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
592 // On teste si le thread doit etre detruit
593 pthread_testcancel();
597 // On regarde si le fils n'a pas depasse son temps (wallclock time)
598 time_t child_currenttime = time(NULL);
599 time_t child_elapsedtime = child_currenttime - child_starttime;
600 if (param.find(MAXWALLTIME) != param.end()) {
601 int maxwalltime = param[MAXWALLTIME];
602 // cout << "child_starttime = " << child_starttime << endl
603 // << "child_currenttime = " << child_currenttime << endl
604 // << "child_elapsedtime = " << child_elapsedtime << endl
605 // << "maxwalltime = " << maxwalltime << endl
606 // << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
607 if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
608 UNDER_LOCK( cout << "Father is sending KILL command to the thread " << _id << endl );
609 // On introduit une commande dans la queue du thread
610 // @@@ --------> SECTION CRITIQUE <-------- @@@
611 pthread_mutex_lock(&_bm._threads_mutex);
612 if (_bm._threads.find(_id) != _bm._threads.end())
613 _bm._threads[_id].command_queue.push(KILL);
614 pthread_mutex_unlock(&_bm._threads_mutex);
615 // @@@ --------> SECTION CRITIQUE <-------- @@@
618 } else if (child_elapsedtime > maxwalltime ) {
619 UNDER_LOCK( cout << "Father is sending TERM command to the thread " << _id << endl );
620 // On introduit une commande dans la queue du thread
621 // @@@ --------> SECTION CRITIQUE <-------- @@@
622 pthread_mutex_lock(&_bm._threads_mutex);
623 if (_bm._threads.find(_id) != _bm._threads.end())
624 _bm._threads[_id].command_queue.push(TERM);
625 pthread_mutex_unlock(&_bm._threads_mutex);
626 // @@@ --------> SECTION CRITIQUE <-------- @@@
632 // On regarde s'il y a quelque chose a faire dans la queue de commande
633 // @@@ --------> SECTION CRITIQUE <-------- @@@
634 pthread_mutex_lock(&_bm._threads_mutex);
635 if (_bm._threads.find(_id) != _bm._threads.end()) {
636 while (_bm._threads[_id].command_queue.size() > 0) {
637 Commande cmd = _bm._threads[_id].command_queue.front();
638 _bm._threads[_id].command_queue.pop();
642 UNDER_LOCK( cout << "Father does nothing to his child" << endl );
646 UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
647 kill(child, SIGSTOP);
651 UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
652 kill(child, SIGCONT);
656 UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
657 kill(child, SIGTERM);
661 UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
662 kill(child, SIGKILL);
674 pthread_mutex_unlock(&_bm._threads_mutex);
675 // @@@ --------> SECTION CRITIQUE <-------- @@@
677 // On fait une petite pause pour ne pas surcharger inutilement le processeur
692 void BatchManager_Local::ThreadAdapter::fils()
694 Parametre param = _job.getParametre();
695 Parametre::iterator it;
697 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
698 //int result = execv("/usr/bin/ssh", parmList);
699 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
700 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
703 // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
704 vector<string> command;
705 if (param.find(EXECUTABLE) != param.end()) {
706 command = _bm.exec_command(param);
709 // Build the argument array argv from the command
710 char ** argv = new char * [command.size() + 1];
712 for (string::size_type i=0 ; i<command.size() ; i++) {
713 argv[i] = new char[command[i].size() + 1];
714 strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
715 if (i>0) comstr += " # ";
716 comstr += command[i];
719 argv[command.size()] = NULL;
721 UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
722 UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
724 // Create the environment for the new process. Note (RB): Here we change the environment for
725 // the process launched in local. It would seem more logical to set the environment for the
727 Environnement env = _job.getEnvironnement();
731 envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
733 for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
734 const string & key = (*it).first;
735 const string & value = (*it).second;
737 oss << key << "=" << value;
738 envp[i] = new char [oss.str().size() + 1];
739 strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
742 // assert (i == env.size())
746 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
747 //int result = execv("/usr/bin/ssh", parmList);
748 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
749 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
753 // On positionne les limites systeme imposees au fils
754 if (param.find(MAXCPUTIME) != param.end()) {
755 int maxcputime = param[MAXCPUTIME];
757 limit.rlim_cur = maxcputime;
758 limit.rlim_max = int(maxcputime * 1.1);
759 setrlimit(RLIMIT_CPU, &limit);
762 if (param.find(MAXDISKSIZE) != param.end()) {
763 int maxdisksize = param[MAXDISKSIZE];
765 limit.rlim_cur = maxdisksize * 1024;
766 limit.rlim_max = int(maxdisksize * 1.1) * 1024;
767 setrlimit(RLIMIT_FSIZE, &limit);
770 if (param.find(MAXRAMSIZE) != param.end()) {
771 int maxramsize = param[MAXRAMSIZE];
773 limit.rlim_cur = maxramsize * 1024;
774 limit.rlim_max = int(maxramsize * 1.1) * 1024;
775 setrlimit(RLIMIT_AS, &limit);
779 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
780 //int result = execv("/usr/bin/ssh", parmList);
781 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
782 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
784 // On cree une session pour le fils de facon a ce qu'il ne soit pas
785 // detruit lorsque le shell se termine (le shell ouvre une session et
786 // tue tous les process appartenant a la session en quittant)
790 // On ferme les descripteurs de fichiers standards
791 //close(STDIN_FILENO);
792 //close(STDOUT_FILENO);
793 //close(STDERR_FILENO);
796 // On execute la commande du fils
797 int result = execve(argv[0], argv, envp);
798 UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
799 // No need to deallocate since nothing happens after a successful exec
801 // Normalement on ne devrait jamais arriver ici
802 ofstream file_err("error.log");
803 UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
805 } catch (GenericException & e) {
807 std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
815 BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
817 Parametre param = _job.getParametre();
818 Parametre::iterator it;
819 PROCESS_INFORMATION pi;
823 // EXECUTABLE is MANDATORY, if missing, we throw an exception
824 vector<string> exec_command;
825 if (param.find(EXECUTABLE) != param.end()) {
826 exec_command = _bm.exec_command(param);
828 throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
831 // Build the command string from the command argument vector
833 for (unsigned int i=0 ; i<exec_command.size() ; i++) {
834 if (i>0) comstr += " ";
835 comstr += exec_command[i];
838 UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
840 // Create the environment for the new process. Note (RB): Here we change the environment for
841 // the process launched in local. It would seem more logical to set the environment for the
843 // Note that if no environment is specified, we reuse the current environment.
844 Environnement env = _job.getEnvironnement();
845 char * chNewEnv = NULL;
848 chNewEnv = new char[4096];
849 LPTSTR lpszCurrentVariable = chNewEnv;
850 for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
851 const string & key = (*it).first;
852 const string & value = (*it).second;
853 string envvar = key + "=" + value;
854 envvar.copy(lpszCurrentVariable, envvar.size());
855 lpszCurrentVariable[envvar.size()] = '\0';
856 lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
858 // Terminate the block with a NULL byte.
859 *lpszCurrentVariable = '\0';
864 ZeroMemory( &si, sizeof(si) );
866 ZeroMemory( &pi, sizeof(pi) );
868 // Copy the command to a non-const buffer
869 char * buffer = strdup(comstr.c_str());
871 // launch the new process
872 BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
873 CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
875 if (buffer) free(buffer);
876 if (!res) throw RunTimeException("Error while creating new process");
878 CloseHandle(pi.hThread);
880 } catch (GenericException & e) {
882 std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
891 void BatchManager_Local::kill_child_on_exit(void * p_pid)
894 //TODO: porting of following functionality
895 pid_t child = * static_cast<pid_t *>(p_pid);
898 kill(child, SIGTERM);
900 // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
901 // mais cette option n'est pas implementee pour le moment, car il est
902 // preferable de laisser le process fils se terminer normalement et seul.
906 void BatchManager_Local::delete_on_exit(void * arg)
908 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);