* Date : Thu Nov 6 10:17:22 2003
* Projet : Salome 2
*
+* Refactored by Renaud Barate (EDF R&D) in September 2009 to use
+* CommunicationProtocol classes and merge Local_SH, Local_RSH and Local_SSH batch
+* managers.
*/
#include <iostream>
#include <sys/types.h>
#ifdef WIN32
-# include <direct.h>
+#include <direct.h>
#include "Batch_RunTimeException.hxx"
#else
-# include <sys/wait.h>
-# include <unistd.h>
+#include <sys/wait.h>
+#include <unistd.h>
#endif
#include <ctime>
#include <pthread.h>
// Constructeur
- BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host)
- throw(InvalidArgumentException,ConnexionFailureException)
- : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(), _thread_id_id_association_mutex(),
- _thread_id_id_association_cond(), _thread_id_id_association()
+ BatchManager_Local::BatchManager_Local(const FactBatchManager * parent, const char * host,
+ CommunicationProtocolType protocolType)
+ : BatchManager(parent, host), _connect(0), _threads_mutex(), _threads(),
+ _protocol(CommunicationProtocol::getInstance(protocolType)),
+ _thread_id_id_association_mutex(), _thread_id_id_association_cond(), _thread_id_id_association()
{
pthread_mutex_init(&_threads_mutex, NULL);
pthread_mutex_init(&_thread_id_id_association_mutex, NULL);
pthread_cond_destroy(&_thread_id_id_association_cond);
}
+ const CommunicationProtocol & BatchManager_Local::getProtocol() const
+ {
+ return _protocol;
+ }
+
// Methode pour le controle des jobs : soumet un job au gestionnaire
const JobId BatchManager_Local::submitJob(const Job & job)
{
}
+ vector<string> BatchManager_Local::exec_command(const Parametre & param) const
+ {
+ ostringstream exec_sub_cmd;
+
+#ifdef WIN32
+ char drive[_MAX_DRIVE];
+ _splitpath(string(param[WORKDIR]).c_str(), drive, NULL, NULL, NULL);
+ if (strlen(drive) > 0) exec_sub_cmd << drive << " && ";
+#endif
+
+ exec_sub_cmd << "cd " << param[WORKDIR] << " && " << param[EXECUTABLE];
+
+ if (param.find(ARGUMENTS) != param.end()) {
+ Versatile V = param[ARGUMENTS];
+ for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++) {
+ StringType argt = * static_cast<StringType *>(*it);
+ string arg = argt;
+ exec_sub_cmd << " " << arg;
+ }
+ }
+
+ string user;
+ Parametre::const_iterator it = param.find(USER);
+ if (it != param.end()) {
+ user = string(it->second);
+ }
+
+ return _protocol.getExecCommandArgs(exec_sub_cmd.str(), param[EXECUTIONHOST], user);
+ }
+
+
// Fabrique un identifiant unique pour les threads puisque le thread_id n'est pas unique
// au cours du temps (il peut etre reutilise lorsqu'un thread se termine)
// ATTENTION : cette methode est uniquement protegee par la section critique de l'association
string local = cp.getLocal();
string remote = cp.getRemote();
- string copy_cmd = p_ta->getBatchManager().copy_command("", "", local, user,
- executionhost, workdir + "/" + remote);
- UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
-#ifdef WIN32
- copy_cmd = string("\"") + copy_cmd + string("\"");
-#endif
+ std::cerr << workdir << std::endl;
+ std::cerr << remote << std::endl;
- if (system(copy_cmd.c_str()) ) {
+ int status = p_ta->getBatchManager().getProtocol().copyFile(local, "", "",
+ workdir + "/" + remote,
+ executionhost, user);
+ if (status) {
// Echec de la copie
rc |= 1;
} else {
// On forke/exec un nouveau process pour pouvoir controler le fils
// (plus finement qu'avec un appel system)
// int rc = system(commande.c_str());
+ //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 10 && echo end", NULL};
+ //execv("/usr/bin/ssh", parmList);
#ifdef WIN32
child = p_ta->launchWin32ChildProcess();
p_ta->pere(child);
string local = cp.getLocal();
string remote = cp.getRemote();
- string copy_cmd = p_ta->getBatchManager().copy_command(user, executionhost, workdir + "/" + remote,
- "", "", local);
- UNDER_LOCK( cout << "Copying : " << copy_cmd << endl );
-#ifdef WIN32
- copy_cmd = string("\"") + copy_cmd + string("\"");
-#endif
-
- if (system(copy_cmd.c_str()) ) {
+ int status = p_ta->getBatchManager().getProtocol().copyFile(workdir + "/" + remote,
+ executionhost, user,
+ local, "", "");
+ if (status) {
// Echec de la copie
rc |= 1;
} else {
if ( (rc == 0) || (child < 0) ) {
std::vector<string>::const_iterator it;
for(it=files_to_delete.begin(); it!=files_to_delete.end(); it++) {
- string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
+ p_ta->getBatchManager().getProtocol().removeFile(*it, executionhost, user);
+/* string remove_cmd = p_ta->getBatchManager().remove_command(user, executionhost, *it);
UNDER_LOCK( cout << "Removing : " << remove_cmd << endl );
#ifdef WIN32
remove_cmd = string("\"") + remove_cmd + string("\"");
#endif
- system(remove_cmd.c_str());
+ system(remove_cmd.c_str());*/
}
}
int child_rc = 0;
pid_t child_wait_rc = waitpid(child, &child_rc, WNOHANG /* | WUNTRACED */);
if (child_wait_rc > 0) {
+ UNDER_LOCK( cout << "Status is: " << WIFEXITED( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WEXITSTATUS( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WIFSIGNALED( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WTERMSIG( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WCOREDUMP( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WIFSTOPPED( child_rc) << endl);
+ UNDER_LOCK( cout << "Status is: " << WSTOPSIG( child_rc) << endl);
+#ifdef WIFCONTINUED
+ UNDER_LOCK( cout << "Status is: " << WIFCONTINUED( child_rc) << endl); // not compilable on sarge
+#endif
if (WIFSTOPPED(child_rc)) {
// NOTA : pour rentrer dans cette section, il faut que le flag WUNTRACED
// soit positionne dans l'appel a waitpid ci-dessus. Ce flag est couramment
Parametre param = _job.getParametre();
Parametre::iterator it;
+ //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+ //int result = execv("/usr/bin/ssh", parmList);
+ //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+ //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
try {
// EXECUTABLE is MANDATORY, if missing, we exit with failure notification
- char * execpath = NULL;
+ vector<string> command;
if (param.find(EXECUTABLE) != param.end()) {
- string executable = _bm.exec_command(param);
- execpath = new char [executable.size() + 1];
- strncpy(execpath, executable.c_str(), executable.size() + 1);
+ command = _bm.exec_command(param);
} else exit(1);
- string debug_command = execpath;
-
- string name = (param.find(NAME) != param.end()) ? param[NAME] : param[EXECUTABLE];
-
- char ** argv = NULL;
- if (param.find(ARGUMENTS) != param.end()) {
- Versatile V = param[ARGUMENTS];
-
- argv = new char * [V.size() + 2]; // 1 pour name et 1 pour le NULL terminal
-
- argv[0] = new char [name.size() + 1];
- strncpy(argv[0], name.c_str(), name.size() + 1);
-
- debug_command += string(" # ") + argv[0];
-
- int i = 1;
- for(Versatile::const_iterator it=V.begin(); it!=V.end(); it++, i++) {
- StringType argt = * static_cast<StringType *>(*it);
- string arg = argt;
- argv[i] = new char [arg.size() + 1];
- strncpy(argv[i], arg.c_str(), arg.size() + 1);
- debug_command += string(" # ") + argv[i];
- }
-
- // assert (i == V.size() + 1)
- argv[i] = NULL;
+ // Build the argument array argv from the command
+ char ** argv = new char * [command.size() + 1];
+ string comstr;
+ for (string::size_type i=0 ; i<command.size() ; i++) {
+ argv[i] = new char[command[i].size() + 1];
+ strncpy(argv[i], command[i].c_str(), command[i].size() + 1);
+ if (i>0) comstr += " # ";
+ comstr += command[i];
}
+ argv[command.size()] = NULL;
- UNDER_LOCK( cout << "*** debug_command = " << debug_command << endl );
-
-
+ UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
+ UNDER_LOCK( cout << "*** debug_command = " << argv[0] << endl );
// Create the environment for the new process. Note (RB): Here we change the environment for
// the process launched in local. It would seem more logical to set the environment for the
envp[i] = NULL;
}
+ //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+ //int result = execv("/usr/bin/ssh", parmList);
+ //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+ //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
}
+ //char *const parmList[] = {"/usr/bin/ssh", "localhost", "-l", "aribes", "sleep 1 && echo end", NULL};
+ //int result = execv("/usr/bin/ssh", parmList);
+ //UNDER_LOCK( cout << "*** debug_command = " << result << endl );
+ //UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
// On cree une session pour le fils de facon a ce qu'il ne soit pas
// detruit lorsque le shell se termine (le shell ouvre une session et
// On execute la commande du fils
- execve(execpath, argv, envp);
-
+ int result = execve(argv[0], argv, envp);
+ UNDER_LOCK( cout << "*** debug_command = " << strerror(errno) << endl );
// No need to deallocate since nothing happens after a successful exec
// Normalement on ne devrait jamais arriver ici
try {
// EXECUTABLE is MANDATORY, if missing, we throw an exception
- string exec_command;
+ vector<string> exec_command;
if (param.find(EXECUTABLE) != param.end()) {
exec_command = _bm.exec_command(param);
} else {
throw RunTimeException("Parameter \"EXECUTABLE\" is mandatory for local batch submission");
}
- if (param.find(ARGUMENTS) != param.end()) {
- Versatile V = param[ARGUMENTS];
-
- for(Versatile::const_iterator it=V.begin() ; it!=V.end() ; it++) {
- StringType argt = * static_cast<StringType *>(*it);
- exec_command += string(" ") + string(argt);
- }
+ // Build the command string from the command argument vector
+ string comstr;
+ for (unsigned int i=0 ; i<exec_command.size() ; i++) {
+ if (i>0) comstr += " ";
+ comstr += exec_command[i];
}
- UNDER_LOCK( cout << "*** exec_command = " << exec_command << endl );
-
+ UNDER_LOCK( cout << "*** debug_command = " << comstr << endl );
// Create the environment for the new process. Note (RB): Here we change the environment for
// the process launched in local. It would seem more logical to set the environment for the
ZeroMemory( &pi, sizeof(pi) );
// Copy the command to a non-const buffer
- size_t str_size = exec_command.size();
- char buffer[str_size+1];
- exec_command.copy(buffer,str_size);
- buffer[str_size]='\0';
+ char * buffer = strdup(comstr.c_str());
// launch the new process
BOOL res = CreateProcess(NULL, buffer, NULL, NULL, FALSE,
- DETACHED_PROCESS, chNewEnv, NULL, &si, &pi);
+ CREATE_NO_WINDOW, chNewEnv, NULL, &si, &pi);
+ if (buffer) free(buffer);
if (!res) throw RunTimeException("Error while creating new process");
CloseHandle(pi.hThread);