1 // Copyright (C) 2007-2021 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 * BatchManager_Local.cxx :
25 * Auteur : Ivan DUTKA-MALEN - EDF R&D
26 * Mail : mailto:ivan.dutka-malen@der.edf.fr
27 * Date : Thu Nov 6 10:17:22 2003
30 * Refactored by Renaud Barate (EDF R&D) in September 2009 to use
31 * CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
41 #include <sys/types.h>
54 #include "Constants.hxx"
55 #include "IOMutex.hxx"
56 #include "BatchManager_Local.hxx"
57 #include "RunTimeException.hxx"
67 BatchManager_Local::BatchManager_Local(const Batch::FactBatchManager * parent, const char * host,
68 const char * username,
69 CommunicationProtocolType protocolType, const char * mpiImpl)
70 : BatchManager(parent, host, username, protocolType, mpiImpl), _connect(0),
73 pthread_mutex_init(&_threads_mutex, NULL);
74 pthread_cond_init(&_threadSyncCondition, NULL);
78 BatchManager_Local::~BatchManager_Local()
80 for (map<Id, Child>::iterator iter = _threads.begin() ; iter != _threads.end() ; iter++) {
81 pthread_mutex_lock(&_threads_mutex);
82 string state = iter->second.param[STATE];
83 if (state != FINISHED && state != FAILED) {
84 UNDER_LOCK( LOG("Warning: Job " << iter->first <<
85 " is not finished, it will now be canceled."));
86 pthread_cancel(iter->second.thread_id);
87 pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
89 pthread_mutex_unlock(&_threads_mutex);
91 pthread_mutex_destroy(&_threads_mutex);
92 pthread_cond_destroy(&_threadSyncCondition);
95 // Methode pour le controle des jobs : soumet un job au gestionnaire
96 const JobId BatchManager_Local::runJob(const Job & job)
98 Job_Local jobLocal = job;
100 ThreadAdapter * p_ta = new ThreadAdapter(*this, job, id);
102 // Les attributs du thread a sa creation
103 pthread_attr_t thread_attr;
104 pthread_attr_init(&thread_attr);
105 pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
107 // Creation du thread qui va executer la commande systeme qu'on lui passe
109 pthread_mutex_lock(&_threads_mutex);
110 int rc = pthread_create(&thread_id,
113 static_cast<void *>(p_ta));
115 // Liberation des zones memoire maintenant inutiles occupees par les attributs du thread
116 pthread_attr_destroy(&thread_attr);
119 pthread_mutex_unlock(&_threads_mutex);
120 throw RunTimeException("Can't create new thread in BatchManager_Local");
123 pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
124 pthread_mutex_unlock(&_threads_mutex);
126 ostringstream id_sst;
128 return JobId(this, id_sst.str());
131 // Methode pour le controle des jobs : retire un job du gestionnaire
132 void BatchManager_Local::deleteJob(const JobId & jobid)
136 istringstream iss(jobid.getReference());
139 // @@@ --------> SECTION CRITIQUE <-------- @@@
140 pthread_mutex_lock(&_threads_mutex);
141 bool idFound = (_threads.find(id) != _threads.end());
143 string state = _threads[id].param[STATE];
144 if (state != FINISHED && state != FAILED) {
145 pthread_cancel(_threads[id].thread_id);
146 pthread_cond_wait(&_threadSyncCondition, &_threads_mutex);
148 LOG("Cannot delete job " << jobid.getReference() << ". Job is already finished.");
151 pthread_mutex_unlock(&_threads_mutex);
152 // @@@ --------> SECTION CRITIQUE <-------- @@@
155 throw RunTimeException(string("Job with id ") + jobid.getReference() + " does not exist");
158 // Methode pour le controle des jobs : suspend un job en file d'attente
159 void BatchManager_Local::holdJob(const JobId & jobid)
162 istringstream iss(jobid.getReference());
165 UNDER_LOCK( LOG("BatchManager is sending HOLD command to the thread " << id) );
167 // On introduit une commande dans la queue du thread
168 // @@@ --------> SECTION CRITIQUE <-------- @@@
169 pthread_mutex_lock(&_threads_mutex);
170 if (_threads.find(id) != _threads.end())
171 _threads[id].command_queue.push(HOLD);
172 pthread_mutex_unlock(&_threads_mutex);
173 // @@@ --------> SECTION CRITIQUE <-------- @@@
176 // Methode pour le controle des jobs : relache un job suspendu
177 void BatchManager_Local::releaseJob(const JobId & jobid)
180 istringstream iss(jobid.getReference());
183 UNDER_LOCK( LOG("BatchManager is sending RELEASE command to the thread " << id) );
185 // On introduit une commande dans la queue du thread
186 // @@@ --------> SECTION CRITIQUE <-------- @@@
187 pthread_mutex_lock(&_threads_mutex);
188 if (_threads.find(id) != _threads.end())
189 _threads[id].command_queue.push(RELEASE);
190 pthread_mutex_unlock(&_threads_mutex);
191 // @@@ --------> SECTION CRITIQUE <-------- @@@
194 // Methode pour le controle des jobs : renvoie l'etat du job
195 JobInfo BatchManager_Local::queryJob(const JobId & jobid)
198 istringstream iss(jobid.getReference());
204 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
205 // @@@ --------> SECTION CRITIQUE <-------- @@@
206 pthread_mutex_lock(&_threads_mutex);
207 std::map<Id, Child >::iterator pos = _threads.find(id);
208 bool found = (pos != _threads.end());
210 param = pos->second.param;
211 env = pos->second.env;
213 pthread_mutex_unlock(&_threads_mutex);
214 // @@@ --------> SECTION CRITIQUE <-------- @@@
215 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
217 if (!found) throw InvalidArgumentException("Invalid JobId argument for queryJob");
219 JobInfo_Local ji(param, env);
224 // Ce manager ne peut pas reprendre un job
225 // On force donc l'état du job à erreur - pour cela on ne donne pas d'Id
228 BatchManager_Local::addJob(const Batch::Job & /*job*/, const std::string & /*reference*/)
230 return JobId(this, "undefined");
233 // Methode pour le controle des jobs : teste si un job est present en machine
234 bool BatchManager_Local::isRunning(const JobId & jobid)
237 istringstream iss(jobid.getReference());
240 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : AVANT section critique" << endl );
241 // @@@ --------> SECTION CRITIQUE <-------- @@@
242 pthread_mutex_lock(&_threads_mutex);
243 bool running = (_threads[id].param[STATE].str() == RUNNING);
244 pthread_mutex_unlock(&_threads_mutex);
245 // @@@ --------> SECTION CRITIQUE <-------- @@@
246 //UNDER_LOCK( cout << "JobInfo BatchManager_Local::queryJob(const JobId & jobid) : APRES section critique" << endl );
251 string BatchManager_Local::ThreadAdapter::buildCommandFile(const Job_Local & job)
253 Parametre param = job.getParametre();
255 // Mandatory parameters
257 if (param.find(WORKDIR) != param.end())
258 workDir = param[WORKDIR].str();
260 throw RunTimeException("param[WORKDIR] is not defined. Please define it, cannot submit this job.");
261 string fileToExecute = "";
262 if (param.find(EXECUTABLE) != param.end())
263 fileToExecute = param[EXECUTABLE].str();
265 throw RunTimeException("param[EXECUTABLE] is not defined. Please define it, cannot submit this job.");
267 const char separator = '\\';
269 const char separator = '/';
271 string::size_type p1 = fileToExecute.find_last_of(separator);
272 string::size_type p2 = fileToExecute.find_last_of(".");
273 string rootNameToExecute = fileToExecute.substr(p1+1,p2-p1-1);
274 string fileNameToExecute = fileToExecute.substr(p1+1);
275 string remotePath = workDir + separator + rootNameToExecute + "_launch_job";
277 // Create batch submit file
278 ofstream tempOutputFile;
279 string tmpFileName = Utils::createAndOpenTemporaryFile("launch-job", tempOutputFile);
282 if (_bm._protocol.getType() == SH) {
283 char drive[_MAX_DRIVE];
284 _splitpath(workDir.c_str(), drive, NULL, NULL, NULL);
285 if (strlen(drive) > 0) tempOutputFile << drive << endl;
286 tempOutputFile << "cd " << Utils::fixPath(workDir) << endl;
287 // Define environment for the job
288 Environnement env = job.getEnvironnement();
289 for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
290 tempOutputFile << "set " << iter->first << "=" << iter->second << endl;
292 // Launch the executable
293 tempOutputFile << fileNameToExecute;
294 if (param.find(ARGUMENTS) != param.end()) {
295 Versatile V = param[ARGUMENTS];
296 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
297 StringType argt = * static_cast<StringType *>(*it);
299 tempOutputFile << " " << arg;
302 remotePath += ".bat";
306 tempOutputFile << "#!/bin/sh -f" << endl;
307 tempOutputFile << "cd " << workDir << endl;
309 // Optional parameters (system limits on the job process)
310 if (param.find(MAXCPUTIME) != param.end()) {
311 long maxcputime = (long)param[MAXCPUTIME] * 60;
312 tempOutputFile << "ulimit -H -t " << maxcputime << endl;
315 if (param.find(MAXDISKSIZE) != param.end()) {
316 long maxdisksize = (long)param[MAXDISKSIZE] * 1024;
317 tempOutputFile << "ulimit -H -f " << maxdisksize << endl;
320 if (param.find(MAXRAMSIZE) != param.end()) {
321 long maxramsize = (long)param[MAXRAMSIZE] * 1024;
322 tempOutputFile << "ulimit -H -v " << maxramsize << endl;
325 // Number of cores to use
327 if (param.find(NBPROC) != param.end())
328 nbproc = param[NBPROC];
330 // Define environment for the job
331 Environnement env = job.getEnvironnement();
332 for (Environnement::const_iterator iter = env.begin() ; iter != env.end() ; ++iter) {
333 tempOutputFile << "export " << iter->first << "=" << iter->second << endl;
336 // generate nodes file (one line per required proc)
337 tempOutputFile << "LIBBATCH_NODEFILE=$(mktemp nodefile-XXXXXXXXXX)" << endl;
338 tempOutputFile << "i=" << nbproc << endl;
339 tempOutputFile << "hn=$(hostname)" << endl;
340 tempOutputFile << "{" << endl;
341 tempOutputFile << "while [ $i -gt 0 ]" << endl;
342 tempOutputFile << "do" << endl;
343 tempOutputFile << " echo \"$hn\"" << endl;
344 tempOutputFile << " i=$((i-1))" << endl;
345 tempOutputFile << "done" << endl;
346 tempOutputFile << "} > \"$LIBBATCH_NODEFILE\"" << endl;
347 tempOutputFile << "export LIBBATCH_NODEFILE" << endl;
349 // Launch the executable
350 tempOutputFile << "./" + fileNameToExecute;
351 if (param.find(ARGUMENTS) != param.end()) {
352 Versatile V = param[ARGUMENTS];
353 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
354 StringType argt = * static_cast<StringType *>(*it);
356 tempOutputFile << " " << arg;
360 // Standard input and output
361 if (param.find(INFILE) != param.end()) {
362 Versatile V = param[INFILE];
363 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
364 Couple cpl = * static_cast<CoupleType*>(*it);
365 string remote = cpl.getRemote();
366 if (remote == "stdin")
367 tempOutputFile << " <stdin";
371 string stdoutFile = workDir + "/logs/output.log." + rootNameToExecute;
372 string stderrFile = workDir + "/logs/error.log." + rootNameToExecute;
373 if (param.find(OUTFILE) != param.end()) {
374 Versatile V = param[OUTFILE];
375 for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
376 Couple cpl = * static_cast<CoupleType*>(*it);
377 string remote = cpl.getRemote();
378 if (remote == "stdout") stdoutFile = "stdout";
379 if (remote == "stderr") stderrFile = "stderr";
382 tempOutputFile << " 1>" << stdoutFile << " 2>" << stderrFile << endl;
384 // Remove the node file
385 tempOutputFile << "rm \"$LIBBATCH_NODEFILE\"" << endl;
391 tempOutputFile.flush();
392 tempOutputFile.close();
394 LOG("Batch script file generated is: " << tmpFileName);
396 Utils::chmod(tmpFileName.c_str(), 0x1ED);
397 int status = _bm._protocol.copyFile(tmpFileName, "", "",
398 remotePath, _bm._hostname, _bm._username);
400 throw RunTimeException("Cannot copy command file on host " + _bm._hostname);
403 if (_bm._protocol.getType() != SH) {
404 // On Windows, we make the remote file executable afterward because
405 // pscp does not preserve access permissions on files
406 string subCommand = string("chmod u+x ") + remotePath;
407 string command = _bm._protocol.getExecCommand(subCommand, _bm._hostname, _bm._username);
409 status = system(command.c_str());
411 std::ostringstream oss;
412 oss << "Cannot change permissions of file " << remotePath << " on host " << _bm._hostname;
413 oss << ". Return status is " << status;
414 throw RunTimeException(oss.str());
424 // Constructeur de la classe ThreadAdapter
425 BatchManager_Local::ThreadAdapter::ThreadAdapter(BatchManager_Local & bm, const Job_Local & job, Id id) :
426 _bm(bm), _job(job), _id(id)
433 // Methode d'execution du thread
434 void * BatchManager_Local::ThreadAdapter::run(void * arg)
436 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
439 // On bloque tous les signaux pour ce thread
441 sigfillset(&setmask);
442 pthread_sigmask(SIG_BLOCK, &setmask, NULL);
445 // On autorise la terminaison differee du thread
446 // (ces valeurs sont les valeurs par defaut mais on les force par precaution)
447 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
448 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
450 // On enregistre la fonction de suppression du fils en cas d'arret du thread
451 // Cette fontion sera automatiquement appelee lorsqu'une demande d'annulation
452 // sera prise en compte par pthread_testcancel()
454 pthread_cleanup_push(BatchManager_Local::delete_on_exit, arg);
455 pthread_cleanup_push(BatchManager_Local::setFailedOnCancel, arg);
456 pthread_cleanup_push(BatchManager_Local::kill_child_on_exit, static_cast<void *> (&child));
458 // On forke/exec un nouveau process pour pouvoir controler le fils
459 // (plus finement qu'avec un appel system)
460 // int rc = system(commande.c_str());
461 //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
462 //execv("/usr/bin/ssh", parmList);
464 child = p_ta->launchWin32ChildProcess();
467 // LOCK&UNLOCK needed to avoid potential deadlock if a thread holds the lock
471 if (child < 0) { // erreur
472 UNDER_LOCK( LOG("Fork impossible (rc=" << child << ")") );
474 } else if (child > 0) { // pere
482 pthread_mutex_lock(&p_ta->_bm._threads_mutex);
484 // Set the job state to FINISHED or FAILED
485 p_ta->_bm._threads[p_ta->_id].param[STATE] = (p_ta->_bm._threads[p_ta->_id].hasFailed) ? FAILED : FINISHED;
487 // On retire la fonction de nettoyage de la memoire
488 pthread_cleanup_pop(0);
490 // On retire la fonction de suppression du fils
491 pthread_cleanup_pop(0);
493 // remove setFailedOnCancel function from cancel stack
494 pthread_cleanup_pop(0);
496 pthread_mutex_unlock(&p_ta->_bm._threads_mutex);
498 // On invoque la fonction de nettoyage de la memoire
501 UNDER_LOCK( LOG("Father is leaving") );
509 void BatchManager_Local::ThreadAdapter::pere(Process child)
511 time_t child_starttime = time(NULL);
513 // On enregistre le fils dans la table des threads
514 pthread_t thread_id = pthread_self();
516 Parametre param = _job.getParametre();
517 Environnement env = _job.getEnvironnement();
519 ostringstream id_sst;
521 param[ID] = id_sst.str();
522 param[STATE] = Batch::RUNNING;
524 _bm._threads[_id].thread_id = thread_id;
526 _bm._threads[_id].pid = child;
528 _bm._threads[_id].hasFailed = false;
529 _bm._threads[_id].param = param;
530 _bm._threads[_id].env = env;
531 _bm._threads[_id].command_queue.push(NOP);
533 // Unlock the master thread. From here, all shared variables must be protected
534 // from concurrent access
535 pthread_cond_signal(&_bm._threadSyncCondition);
538 // on boucle en attendant que le fils ait termine
542 GetExitCodeProcess(child, &exitCode);
543 if (exitCode != STILL_ACTIVE) {
544 UNDER_LOCK( LOG("Father sees his child is DONE: exit code = " << exitCode) );
549 pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
550 if (child_wait_rc > 0) {
551 UNDER_LOCK( LOG("Status is: " << WIFEXITED( child_rc)) );
552 UNDER_LOCK( LOG("Status is: " << WEXITSTATUS( child_rc)) );
553 UNDER_LOCK( LOG("Status is: " << WIFSIGNALED( child_rc)) );
554 UNDER_LOCK( LOG("Status is: " << WTERMSIG( child_rc)) );
555 UNDER_LOCK( LOG("Status is: " << WCOREDUMP( child_rc)) );
556 UNDER_LOCK( LOG("Status is: " << WIFSTOPPED( child_rc)) );
557 UNDER_LOCK( LOG("Status is: " << WSTOPSIG( child_rc)) );
559 UNDER_LOCK( LOG("Status is: " << WIFCONTINUED( child_rc)) ); // not compilable on sarge
561 if (WIFSTOPPED(child_rc)) {
562 // NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
563 // soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
564 // desactive car s'il est possible de detecter l'arret d'un process, il est
565 // plus difficile de detecter sa reprise.
567 // Le fils est simplement stoppe
568 // @@@ --------> SECTION CRITIQUE <-------- @@@
569 pthread_mutex_lock(&_bm._threads_mutex);
570 _bm._threads[_id].param[STATE] = Batch::PAUSED;
571 pthread_mutex_unlock(&_bm._threads_mutex);
572 // @@@ --------> SECTION CRITIQUE <-------- @@@
573 UNDER_LOCK( LOG("Father sees his child is STOPPED : " << child_wait_rc) );
577 // Le fils est termine, on sort de la boucle et du if englobant
578 UNDER_LOCK( LOG("Father sees his child is DONE : " << child_wait_rc << " (child_rc=" << (WIFEXITED(child_rc) ? WEXITSTATUS(child_rc) : -1) << ")") );
582 else if (child_wait_rc == -1) {
583 // Le fils a disparu ...
584 // @@@ --------> SECTION CRITIQUE <-------- @@@
585 pthread_mutex_lock(&_bm._threads_mutex);
586 _bm._threads[_id].hasFailed = true;
587 pthread_mutex_unlock(&_bm._threads_mutex);
588 // @@@ --------> SECTION CRITIQUE <-------- @@@
589 UNDER_LOCK( LOG("Father sees his child is DEAD : " << child_wait_rc << " (Reason : " << strerror(errno) << ")") );
594 // On teste si le thread doit etre detruit
595 pthread_testcancel();
599 // On regarde si le fils n'a pas depasse son temps (wallclock time)
600 time_t child_currenttime = time(NULL);
601 long child_elapsedtime_minutes = (child_currenttime - child_starttime) / 60L;
602 if (param.find(MAXWALLTIME) != param.end()) {
603 long maxwalltime = param[MAXWALLTIME];
604 // cout << "child_starttime = " << child_starttime << endl
605 // << "child_currenttime = " << child_currenttime << endl
606 // << "child_elapsedtime = " << child_elapsedtime << endl
607 // << "maxwalltime = " << maxwalltime << endl
608 // << "int(maxwalltime * 1.1) = " << int(maxwalltime * 1.1) << endl;
609 if (child_elapsedtime_minutes > long((float)maxwalltime * 1.1) ) { // On se donne 10% de marge avant le KILL
610 UNDER_LOCK( LOG("Father is sending KILL command to the thread " << _id) );
611 // On introduit une commande dans la queue du thread
612 // @@@ --------> SECTION CRITIQUE <-------- @@@
613 pthread_mutex_lock(&_bm._threads_mutex);
614 if (_bm._threads.find(_id) != _bm._threads.end())
615 _bm._threads[_id].command_queue.push(KILL);
616 pthread_mutex_unlock(&_bm._threads_mutex);
617 // @@@ --------> SECTION CRITIQUE <-------- @@@
620 } else if (child_elapsedtime_minutes > maxwalltime ) {
621 UNDER_LOCK( LOG("Father is sending TERM command to the thread " << _id) );
622 // On introduit une commande dans la queue du thread
623 // @@@ --------> SECTION CRITIQUE <-------- @@@
624 pthread_mutex_lock(&_bm._threads_mutex);
625 if (_bm._threads.find(_id) != _bm._threads.end())
626 _bm._threads[_id].command_queue.push(TERM);
627 pthread_mutex_unlock(&_bm._threads_mutex);
628 // @@@ --------> SECTION CRITIQUE <-------- @@@
634 // On regarde s'il y a quelque chose a faire dans la queue de commande
635 // @@@ --------> SECTION CRITIQUE <-------- @@@
636 pthread_mutex_lock(&_bm._threads_mutex);
637 if (_bm._threads.find(_id) != _bm._threads.end()) {
638 while (_bm._threads[_id].command_queue.size() > 0) {
639 Commande cmd = _bm._threads[_id].command_queue.front();
640 _bm._threads[_id].command_queue.pop();
644 UNDER_LOCK( LOG("Father does nothing to his child") );
648 UNDER_LOCK( LOG("Father is sending SIGSTOP signal to his child") );
649 kill(child, SIGSTOP);
653 UNDER_LOCK( LOG("Father is sending SIGCONT signal to his child") );
654 kill(child, SIGCONT);
658 UNDER_LOCK( LOG("Father is sending SIGTERM signal to his child") );
659 kill(child, SIGTERM);
663 UNDER_LOCK( LOG("Father is sending SIGKILL signal to his child") );
664 kill(child, SIGKILL);
676 pthread_mutex_unlock(&_bm._threads_mutex);
677 // @@@ --------> SECTION CRITIQUE <-------- @@@
679 // On fait une petite pause pour ne pas surcharger inutilement le processeur
694 void BatchManager_Local::ThreadAdapter::fils()
696 Parametre param = _job.getParametre();
697 Parametre::iterator it;
701 // build script file to launch the job and copy it on the server
702 string cmdFilePath = buildCommandFile(_job);
704 // define command to submit the job
705 vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
707 // Build the argument array argv from the command
708 char ** argv = new char * [command.size() + 1];
710 for (string::size_type i=0 ; i<command.size() ; i++) {
711 argv[i] = new char[command[i].size() + 1];
712 strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
713 comstr += command[i] + " ";
715 argv[command.size()] = NULL;
716 UNDER_LOCK( LOG("*** debug_command = " << comstr) );
718 // On cree une session pour le fils de facon a ce qu'il ne soit pas
719 // detruit lorsque le shell se termine (le shell ouvre une session et
720 // tue tous les process appartenant a la session en quittant)
723 // On ferme les descripteurs de fichiers standards
724 //close(STDIN_FILENO);
725 //close(STDOUT_FILENO);
726 //close(STDERR_FILENO);
728 // On execute la commande du fils
729 execv(argv[0], argv);
730 UNDER_LOCK( LOG("*** debug_command = " << strerror(errno)) );
731 // No need to deallocate since nothing happens after a successful exec
733 // Normalement on ne devrait jamais arriver ici
734 ofstream file_err("error.log");
735 UNDER_LOCK( file_err << "Echec de l'appel a execve" << endl );
737 } catch (GenericException & e) {
739 LOG("Caught exception : " << e.type << " : " << e.message);
747 BatchManager_Local::Process BatchManager_Local::ThreadAdapter::launchWin32ChildProcess()
749 Parametre param = _job.getParametre();
750 Parametre::iterator it;
751 PROCESS_INFORMATION pi;
752 ZeroMemory( &pi, sizeof(pi) );
756 // build script file to launch the job and copy it on the server
757 string cmdFilePath = buildCommandFile(_job);
759 // define command to submit the job
760 vector<string> command = _bm._protocol.getExecCommandArgs(cmdFilePath, _bm._hostname, _bm._username);
762 // Build the command string from the command argument vector
764 for (unsigned int i=0 ; i<command.size() ; i++) {
765 if (i>0) comstr += " ";
766 comstr += command[i];
769 UNDER_LOCK( LOG("*** debug_command = " << comstr) );
772 ZeroMemory( &si, sizeof(si) );
775 // Copy the command to a non-const buffer
776 char * buffer = strdup(comstr.c_str());
778 // launch the new process
779 bool res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
780 CREATE_NO_WINDOW, NULL, NULL, &si, &pi);
782 if (buffer) free(buffer);
783 if (!res) throw RunTimeException("Error while creating new process");
785 CloseHandle(pi.hThread);
787 } catch (GenericException & e) {
789 LOG("Caught exception : " << e.type << " : " << e.message);
798 void BatchManager_Local::kill_child_on_exit(void * p_pid)
801 //TODO: porting of following functionality
802 pid_t child = * static_cast<pid_t *>(p_pid);
805 kill(child, SIGTERM);
807 // Nota : on pourrait aussi faire a la suite un kill(child, SIGKILL)
808 // mais cette option n'est pas implementee pour le moment, car il est
809 // preferable de laisser le process fils se terminer normalement et seul.
813 void BatchManager_Local::delete_on_exit(void * arg)
815 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
819 void BatchManager_Local::setFailedOnCancel(void * arg)
821 ThreadAdapter * p_ta = static_cast<ThreadAdapter *>(arg);
822 pthread_mutex_lock(&p_ta->getBatchManager()._threads_mutex);
823 p_ta->getBatchManager()._threads[p_ta->getId()].param[STATE] = FAILED;
824 pthread_mutex_unlock(&p_ta->getBatchManager()._threads_mutex);
826 // Unlock the master thread. From here, the batch manager instance should not be used.
827 pthread_cond_signal(&p_ta->getBatchManager()._threadSyncCondition);