1 // Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 * BatchManager_Local.cxx :
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail : mailto:ivan.dutka-malen@der.edf.fr
27 * Date : Thu Nov 6 10:17:22 2003
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
41 #include <sys/types.h>
44 #include "Batch_RunTimeException.hxx"
54 #include "Batch_IOMutex.hxx"
55 #include "Batch_BatchManager_Local.hxx"
63 BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
64 CommunicationProtocolType protocolType)
65 : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(),
66 _protocol(CommunicationProtocol::getInstance(protocolType)),
67 _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
69 pthread_mutex_init(&_threads_mutex, NULL);
70 pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
71 pthread_cond_init(&_thread_id_id_association_cond, NULL);
75 BatchManager_Local::~BatchManager_Local()
77 pthread_mutex_destroy(&_threads_mutex);
78 pthread_mutex_destroy(&_thread_id_id_association_mutex);
79 pthread_cond_destroy(&_thread_id_id_association_cond);
82 const CommunicationProtocol & BatchManager_Local::getProtocol() const
87 // Methode pour le controle des jobs : soumet un job au gestionnaire
88 const JobId BatchManager_Local::submitJob(const Job & job)
90 Job_Local jobLocal = job;
92 pthread_t thread_id = submit(jobLocal);
95 oss << getIdByThread_id(thread_id);
97 JobId id(this, oss.str());
102 // Methode pour le controle des jobs : retire un job du gestionnaire
103 void BatchManager_Local::deleteJob(const JobId & jobid)
107 istringstream iss(jobid.getReference());
110 // On retrouve le thread_id du thread
113 // @@@ --------> SECTION CRITIQUE <-------- @@@
114 pthread_mutex_lock(&_threads_mutex);
115 if (_threads.find(id) != _threads.end())
116 thread_id = _threads[id].thread_id;
117 pthread_mutex_unlock(&_threads_mutex);
118 // @@@ --------> SECTION CRITIQUE <-------- @@@
123 // Methode pour le controle des jobs : suspend un job en file d'attente
124 void BatchManager_Local::holdJob(const JobId & jobid)
127 istringstream iss(jobid.getReference());
130 UNDER_LOCK( cout << "BatchManager is sending HOLD command to the thread " << id << endl );
132 // On introduit une commande dans la queue du thread
133 // @@@ --------> SECTION CRITIQUE <-------- @@@
134 pthread_mutex_lock(&_threads_mutex);
135 if (_threads.find(id) != _threads.end())
136 _threads[id].command_queue.push(HOLD);
137 pthread_mutex_unlock(&_threads_mutex);
138 // @@@ --------> SECTION CRITIQUE <-------- @@@
141 // Methode pour le controle des jobs : relache un job suspendu
142 void BatchManager_Local::releaseJob(const JobId & jobid)
145 istringstream iss(jobid.getReference());
148 UNDER_LOCK( cout << "BatchManager is sending RELEASE command to the thread " << id << endl );
150 // On introduit une commande dans la queue du thread
151 // @@@ --------> SECTION CRITIQUE <-------- @@@
152 pthread_mutex_lock(&_threads_mutex);
153 if (_threads.find(id) != _threads.end())
154 _threads[id].command_queue.push(RELEASE);
155 pthread_mutex_unlock(&_threads_mutex);
156 // @@@ --------> SECTION CRITIQUE <-------- @@@
160 // Methode pour le controle des jobs : modifie un job en file d'attente
161 void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param, const Environnement & env)
165 // Methode pour le controle des jobs : modifie un job en file d'attente
166 void BatchManager_Local::alterJob(const JobId & jobid, const Parametre & param)
168 alterJob(jobid, param, Environnement());
171 // Methode pour le controle des jobs : modifie un job en file d'attente
172 void BatchManager_Local::alterJob(const JobId & jobid, const Environnement & env)
174 alterJob(jobid, Parametre(), env);
179 // Methode pour le controle des jobs : renvoie l'etat du job
180 JobInfo BatchManager_Local::queryJob(const JobId & jobid)
183 istringstream iss(jobid.getReference());
189 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
190 // @@@ --------> SECTION CRITIQUE <-------- @@@
191 pthread_mutex_lock(&_threads_mutex);
192 param = _threads[id].param;
193 env = _threads[id].env;
194 pthread_mutex_unlock(&_threads_mutex);
195 // @@@ --------> SECTION CRITIQUE <-------- @@@
196 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
198 JobInfo_Local ji(param, env);
204 // Methode pour le controle des jobs : teste si un job est present en machine
205 bool BatchManager_Local::isRunning(const JobId & jobid)
208 istringstream iss(jobid.getReference());
213 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
214 // @@@ --------> SECTION CRITIQUE <-------- @@@
215 pthread_mutex_lock(&_threads_mutex);
216 status = _threads[id].status;
217 pthread_mutex_unlock(&_threads_mutex);
218 // @@@ --------> SECTION CRITIQUE <-------- @@@
219 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
221 return (status == RUNNING);
225 // Methode d'execution d'un job
226 pthread_t BatchManager_Local::submit(const Job_Local & job)
228 // L'id du thread a creer
229 pthread_t thread_id =
236 // Les attributs du thread a sa creation
237 pthread_attr_t thread_attr;
238 pthread_attr_init(&thread_attr);
239 pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
241 ThreadAdapter * p_ta = new ThreadAdapter(*this, job);
243 // Creation du thread qui va executer la commande systeme qu'on lui passe
244 int rc = pthread_create(&thread_id,
247 static_cast<void *>(p_ta));
251 // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
252 pthread_attr_destroy(&thread_attr);
258 // Methode de destruction d'un job
259 void BatchManager_Local::cancel(pthread_t thread_id)
261 pthread_cancel(thread_id);
265 vector<string> BatchManager_Local::exec_command(const Parametre & param) const
267 ostringstream exec_sub_cmd;
270 char drive[_MAX_DRIVE];
271 _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
272 if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
275 exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
277 if (param.find(ARGUMENTS) != param.end()) {
278 Versatile V = param[ARGUMENTS];
279 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
280 StringType argt = * static_cast<StringType *>(*it);
282 exec_sub_cmd << " " << arg;
287 Parametre::const_iterator it = param.find(USER);
288 if (it != param.end()) {
289 user = string(it->second);
292 return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
296 // Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
297 // au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
298 // ATTENTION : cette methode est uniquement protegee par la section critique de l'association
299 // Thread_id / Id (_thread_id_id_association_mutex)
300 BatchManager_Local::Id BatchManager_Local::nextId()
304 //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::nextId() : Id = " << nextId << endl );
309 // Retourne l'Id enregistre dans l'association Thread_id / Id et le detruit immediatement
310 BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id)
312 // @@@ --------> SECTION CRITIQUE <-------- @@@
313 pthread_mutex_lock(&_thread_id_id_association_mutex);
314 bool threadIdFound = false;
315 std::list<struct ThreadIdIdAssociation>::iterator it;
316 while (!threadIdFound) {
317 for (it = _thread_id_id_association.begin() ;
318 it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
320 if (it == _thread_id_id_association.end())
321 pthread_cond_wait(&_thread_id_id_association_cond, &_thread_id_id_association_mutex);
323 threadIdFound = true;
327 _thread_id_id_association.erase(it);
329 pthread_mutex_unlock(&_thread_id_id_association_mutex);
330 // @@@ --------> SECTION CRITIQUE <-------- @@@
332 //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::getIdByThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
337 // Associe un Thread_id a un Id nouvellement cree
338 BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id)
342 // @@@ --------> SECTION CRITIQUE <-------- @@@
343 pthread_mutex_lock(&_thread_id_id_association_mutex);
344 std::list<struct ThreadIdIdAssociation>::iterator it;
345 for (it = _thread_id_id_association.begin() ;
346 it != _thread_id_id_association.end() && !pthread_equal(it->threadId, thread_id) ;
348 if (it == _thread_id_id_association.end()) {
349 struct ThreadIdIdAssociation newAssociation;
350 id = newAssociation.id = nextId();
351 newAssociation.threadId = thread_id;
352 _thread_id_id_association.push_back(newAssociation);
353 pthread_cond_signal(&_thread_id_id_association_cond);
356 UNDER_LOCK( cerr << "ERROR : Pthread Inconstency. Two threads own the same thread_id." << endl );
358 pthread_mutex_unlock(&_thread_id_id_association_mutex);
359 // @@@ --------> SECTION CRITIQUE <-------- @@@
361 //UNDER_LOCK( cout << "BatchManager_Local::Id BatchManager_Local::registerThread_id(pthread_t thread_id) : Id = " << id << " - thread_id = " << thread_id << endl );
366 // Constructeur de la classe ThreadAdapter
367 BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job) :
375 // Methode d'execution du thread
376 void * BatchManager_Local::ThreadAdapter::run(void * arg)
379 // On bloque tous les signaux pour ce thread
381 sigfillset(&setmask);
382 pthread_sigmask(SIG_BLOCK, &setmask, NULL);
385 // On autorise la terminaison differee du thread
386 // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
387 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
388 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
390 // On enregistre la fonction de suppression du fils en cas d'arret du thread
391 // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
392 // sera prise en compte par pthread_testcancel()
394 pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
395 pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
397 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
402 // Le code retour cumule (ORed) de tous les appels
403 // Nul en cas de reussite de l'ensemble des operations
406 // Cette table contient la liste des fichiers a detruire a la fin du processus
407 std::vector<string> files_to_delete;
411 // On copie les fichiers d'entree pour le fils
412 const Parametre param = p_ta->_job.getParametre();
413 Parametre::const_iterator it;
415 // On initialise la variable workdir a la valeur du Current Working Directory
421 getcwd(cwd, PATH_MAX);
423 string workdir = cwd;
426 if ( (it = param.find(WORKDIR)) != param.end() ) {
427 workdir = static_cast<string>( (*it).second );
430 string executionhost = string(param[EXECUTIONHOST]);
432 if ( (it = param.find(USER)) != param.end() ) {
433 user = string(it->second);
436 if ( (it = param.find(INFILE)) != param.end() ) {
437 Versatile V = (*it).second;
438 Versatile::iterator Vit;
440 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
441 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
443 string local = cp.getLocal();
444 string remote = cp.getRemote();
446 std::cerr << workdir << std::endl;
447 std::cerr << remote << std::endl;
449 int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
450 workdir + "/" + remote,
451 executionhost, user);
456 // On enregistre le fichier comme etant a detruire
457 files_to_delete.push_back(workdir + "/" + remote);
466 // On forke/exec un nouveau process pour pouvoir controler le fils
467 // (plus finement qu'avec un appel system)
468 // int rc = system(commande.c_str());
469 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
470 //execv("/usr/bin/ssh", parmList);
472 child = p_ta->launchWin32ChildProcess();
476 if (child < 0) { // erreur
477 UNDER_LOCK( cerr << "Fork impossible (rc=" << child << ")" << endl );
479 } else if (child > 0) { // pere
488 // On copie les fichiers de sortie du fils
489 if ( (it = param.find(OUTFILE)) != param.end() ) {
490 Versatile V = (*it).second;
491 Versatile::iterator Vit;
493 for(Vit=V.begin(); Vit!=V.end(); Vit++) {
494 CoupleType cpt = *static_cast< CoupleType * >(*Vit);
496 string local = cp.getLocal();
497 string remote = cp.getRemote();
499 int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
506 // On enregistre le fichier comme etant a detruire
507 files_to_delete.push_back(workdir + "/" + remote);
513 // On efface les fichiers d'entree et de sortie du fils si les copies precedentes ont reussi
514 // ou si la creation du fils n'a pu avoir lieu
515 if ( (rc == 0) || (child < 0) ) {
516 std::vector<string>::const_iterator it;
517 for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
518 p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
519 /* string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
520 UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
522 remove_cmd = string("\"") + remove_cmd + string("\"");
524 system(remove_cmd.c_str());*/
530 // On retire la fonction de nettoyage de la memoire
531 pthread_cleanup_pop(0);
533 // On retire la fonction de suppression du fils
534 pthread_cleanup_pop(0);
538 // On invoque la fonction de nettoyage de la memoire
541 UNDER_LOCK( cout << "Father is leaving" << endl );
549 void BatchManager_Local::ThreadAdapter::pere(Process child)
551 time_t child_starttime = time(NULL);
553 // On enregistre le fils dans la table des threads
554 pthread_t thread_id = pthread_self();
555 Id id = _bm.registerThread_id(thread_id);
557 Parametre param = _job.getParametre();
558 Environnement env = _job.getEnvironnement();
560 ostringstream thread_id_sst;
562 param[ID] = thread_id_sst.str();
563 param[STATE] = "Running";
568 // @@@ --------> SECTION CRITIQUE <-------- @@@
569 pthread_mutex_lock(&_bm._threads_mutex);
570 _bm._threads[id].thread_id = thread_id;
572 _bm._threads[id].pid = child;
574 _bm._threads[id].status = RUNNING;
575 _bm._threads[id].param = param;
576 _bm._threads[id].env = env;
577 _bm._threads[id].command_queue.push(NOP);
578 pthread_mutex_unlock(&_bm._threads_mutex);
579 // @@@ --------> SECTION CRITIQUE <-------- @@@
584 // on boucle en attendant que le fils ait termine
588 BOOL res = GetExitCodeProcess(child, &exitCode);
589 if (exitCode != STILL_ACTIVE) {
590 pthread_mutex_lock(&_bm._threads_mutex);
591 _bm._threads[id].status = DONE;
592 _bm._threads[id].param[STATE] = "Done";
593 pthread_mutex_unlock(&_bm._threads_mutex);
594 // @@@ --------> SECTION CRITIQUE <-------- @@@
595 UNDER_LOCK( cout << "Father sees his child is DONE: exit code = " << exitCode << endl );
600 pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
601 if (child_wait_rc > 0) {
602 UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
603 UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
604 UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
605 UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
606 UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
607 UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
608 UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
610 UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
612 if (WIFSTOPPED(child_rc)) {
613 // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
614 // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
615 // desactive car s'il est possible de detecter l'arret d'un process, il est
616 // plus difficile de detecter sa reprise.
618 // Le fils est simplement stoppe
619 // @@@ --------> SECTION CRITIQUE <-------- @@@
620 pthread_mutex_lock(&_bm._threads_mutex);
621 _bm._threads[id].status = STOPPED;
622 _bm._threads[id].param[STATE] = "Stopped";
623 pthread_mutex_unlock(&_bm._threads_mutex);
624 // @@@ --------> SECTION CRITIQUE <-------- @@@
625 UNDER_LOCK( cout << "Father sees his child is STOPPED : " << child_wait_rc << endl );
629 // Le fils est termine, on sort de la boucle et du if englobant
630 // @@@ --------> SECTION CRITIQUE <-------- @@@
631 pthread_mutex_lock(&_bm._threads_mutex);
632 _bm._threads[id].status = DONE;
633 _bm._threads[id].param[STATE] = "Done";
634 pthread_mutex_unlock(&_bm._threads_mutex);
635 // @@@ --------> SECTION CRITIQUE <-------- @@@
636 UNDER_LOCK( cout << "Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")" << endl );
640 else if (child_wait_rc == -1) {
641 // Le fils a disparu ...
642 // @@@ --------> SECTION CRITIQUE <-------- @@@
643 pthread_mutex_lock(&_bm._threads_mutex);
644 _bm._threads[id].status = DEAD;
645 _bm._threads[id].param[STATE] = "Dead";
646 pthread_mutex_unlock(&_bm._threads_mutex);
647 // @@@ --------> SECTION CRITIQUE <-------- @@@
648 UNDER_LOCK( cout << "Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")" << endl );
653 // On teste si le thread doit etre detruit
654 pthread_testcancel();
658 // On regarde si le fils n'a pas depasse son temps (wallclock time)
659 time_t child_currenttime = time(NULL);
660 time_t child_elapsedtime = child_currenttime - child_starttime;
661 if (param.find(MAXWALLTIME) != param.end()) {
662 int maxwalltime = param[MAXWALLTIME];
663 // cout << "child_starttime = " << child_starttime << endl
664 // << "child_currenttime = " << child_currenttime << endl
665 // << "child_elapsedtime = " << child_elapsedtime << endl
666 // << "maxwalltime = " << maxwalltime << endl
667 // << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
668 if (child_elapsedtime > int(maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
669 UNDER_LOCK( cout << "Father is sending KILL command to the thread " << id << endl );
670 // On introduit une commande dans la queue du thread
671 // @@@ --------> SECTION CRITIQUE <-------- @@@
672 pthread_mutex_lock(&_bm._threads_mutex);
673 if (_bm._threads.find(id) != _bm._threads.end())
674 _bm._threads[id].command_queue.push(KILL);
675 pthread_mutex_unlock(&_bm._threads_mutex);
676 // @@@ --------> SECTION CRITIQUE <-------- @@@
679 } else if (child_elapsedtime > maxwalltime ) {
680 UNDER_LOCK( cout << "Father is sending TERM command to the thread " << id << endl );
681 // On introduit une commande dans la queue du thread
682 // @@@ --------> SECTION CRITIQUE <-------- @@@
683 pthread_mutex_lock(&_bm._threads_mutex);
684 if (_bm._threads.find(id) != _bm._threads.end())
685 _bm._threads[id].command_queue.push(TERM);
686 pthread_mutex_unlock(&_bm._threads_mutex);
687 // @@@ --------> SECTION CRITIQUE <-------- @@@
693 // On regarde s'il y a quelque chose a faire dans la queue de commande
694 // @@@ --------> SECTION CRITIQUE <-------- @@@
695 pthread_mutex_lock(&_bm._threads_mutex);
696 if (_bm._threads.find(id) != _bm._threads.end()) {
697 while (_bm._threads[id].command_queue.size() > 0) {
698 Commande cmd = _bm._threads[id].command_queue.front();
699 _bm._threads[id].command_queue.pop();
703 UNDER_LOCK( cout << "Father does nothing to his child" << endl );
707 UNDER_LOCK( cout << "Father is sending SIGSTOP signal to his child" << endl );
708 kill(child, SIGSTOP);
712 UNDER_LOCK( cout << "Father is sending SIGCONT signal to his child" << endl );
713 kill(child, SIGCONT);
717 UNDER_LOCK( cout << "Father is sending SIGTERM signal to his child" << endl );
718 kill(child, SIGTERM);
722 UNDER_LOCK( cout << "Father is sending SIGKILL signal to his child" << endl );
723 kill(child, SIGKILL);
735 pthread_mutex_unlock(&_bm._threads_mutex);
736 // @@@ --------> SECTION CRITIQUE <-------- @@@
738 // On fait une petite pause pour ne pas surcharger inutilement le processeur
753 void BatchManager_Local::ThreadAdapter::fils()
755 Parametre param = _job.getParametre();
756 Parametre::iterator it;
758 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
759 //int result = execv("/usr/bin/ssh", parmList);
760 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
761 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
764 // EXECUTABLE is MANDATORY, if missing, we exit with failure notification
765 vector<string> command;
766 if (param.find(EXECUTABLE) != param.end()) {
767 command = _bm.exec_command(param);
770 // Build the argument array argv from the command
771 char ** argv = new char * [command.size() + 1];
773 for (string::size_type i=0 ; i<command.size() ; i++) {
774 argv[i] = new char[command[i].size() + 1];
775 strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
776 if (i>0) comstr += " # ";
777 comstr += command[i];
780 argv[command.size()] = NULL;
782 UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
783 UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
785 // Create the environment for the new process. Note (RB): Here we change the environment for
786 // the process launched in local. It would seem more logical to set the environment for the
788 Environnement env = _job.getEnvironnement();
792 envp = new char * [env.size() + 1]; // 1 pour le NULL terminal
794 for(Environnement::const_iterator it=env.begin(); it!=env.end(); it++, i++) {
795 const string & key = (*it).first;
796 const string & value = (*it).second;
798 oss << key << "=" << value;
799 envp[i] = new char [oss.str().size() + 1];
800 strncpy(envp[i], oss.str().c_str(), oss.str().size() + 1);
803 // assert (i == env.size())
807 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
808 //int result = execv("/usr/bin/ssh", parmList);
809 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
810 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
814 // On positionne les limites systeme imposees au fils
815 if (param.find(MAXCPUTIME) != param.end()) {
816 int maxcputime = param[MAXCPUTIME];
818 limit.rlim_cur = maxcputime;
819 limit.rlim_max = int(maxcputime * 1.1);
820 setrlimit(RLIMIT_CPU, &limit);
823 if (param.find(MAXDISKSIZE) != param.end()) {
824 int maxdisksize = param[MAXDISKSIZE];
826 limit.rlim_cur = maxdisksize * 1024;
827 limit.rlim_max = int(maxdisksize * 1.1) * 1024;
828 setrlimit(RLIMIT_FSIZE, &limit);
831 if (param.find(MAXRAMSIZE) != param.end()) {
832 int maxramsize = param[MAXRAMSIZE];
834 limit.rlim_cur = maxramsize * 1024;
835 limit.rlim_max = int(maxramsize * 1.1) * 1024;
836 setrlimit(RLIMIT_AS, &limit);
840 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
841 //int result = execv("/usr/bin/ssh", parmList);
842 //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
843 //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
845 // On cree une session pour le fils de facon a ce qu'il ne soit pas
846 // detruit lorsque le shell se termine (le shell ouvre une session et
847 // tue tous les process appartenant a la session en quittant)
851 // On ferme les descripteurs de fichiers standards
852 //close(STDIN_FILENO);
853 //close(STDOUT_FILENO);
854 //close(STDERR_FILENO);
857 // On execute la commande du fils
858 int result = execve(argv[0], argv, envp);
859 UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
860 // No need to deallocate since nothing happens after a successful exec
862 // Normalement on ne devrait jamais arriver ici
863 ofstream file_err("error.log");
864 UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
866 } catch (GenericException & e) {
868 std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
876 BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
878 Parametre param = _job.getParametre();
879 Parametre::iterator it;
880 PROCESS_INFORMATION pi;
884 // EXECUTABLE is MANDATORY, if missing, we throw an exception
885 vector<string> exec_command;
886 if (param.find(EXECUTABLE) != param.end()) {
887 exec_command = _bm.exec_command(param);
889 throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
892 // Build the command string from the command argument vector
894 for (unsigned int i=0 ; i<exec_command.size() ; i++) {
895 if (i>0) comstr += " ";
896 comstr += exec_command[i];
899 UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
901 // Create the environment for the new process. Note (RB): Here we change the environment for
902 // the process launched in local. It would seem more logical to set the environment for the
904 // Note that if no environment is specified, we reuse the current environment.
905 Environnement env = _job.getEnvironnement();
906 char * chNewEnv = NULL;
909 chNewEnv = new char[4096];
910 LPTSTR lpszCurrentVariable = chNewEnv;
911 for(Environnement::const_iterator it=env.begin() ; it!=env.end() ; it++) {
912 const string & key = (*it).first;
913 const string & value = (*it).second;
914 string envvar = key + "=" + value;
915 envvar.copy(lpszCurrentVariable, envvar.size());
916 lpszCurrentVariable[envvar.size()] = '\0';
917 lpszCurrentVariable += lstrlen(lpszCurrentVariable) + 1;
919 // Terminate the block with a NULL byte.
920 *lpszCurrentVariable = '\0';
925 ZeroMemory( &si, sizeof(si) );
927 ZeroMemory( &pi, sizeof(pi) );
929 // Copy the command to a non-const buffer
930 char * buffer = strdup(comstr.c_str());
932 // launch the new process
933 BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
934 CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
936 if (buffer) free(buffer);
937 if (!res) throw RunTimeException("Error while creating new process");
939 CloseHandle(pi.hThread);
941 } catch (GenericException & e) {
943 std::cerr << "Caught exception : " << e.type << " : " << e.message << std::endl;
952 void BatchManager_Local::kill_child_on_exit(void * p_pid)
955 //TODO: porting of following functionality
956 pid_t child = * static_cast<pid_t *>(p_pid);
959 kill(child, SIGTERM);
961 // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
962 // mais cette option n'est pas implementee pour le moment, car il est
963 // preferable de laisser le process fils se terminer normalement et seul.
967 void BatchManager_Local::delete_on_exit(void * arg)
969 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);