Salome HOME
merge from BR_V4dev_resman 24oct07
[modules/kernel.git] / src / Container / SALOME_ContainerManager.cxx
1 // Copyright (C) 2005  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
2 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
3 // 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either 
7 // version 2.1 of the License.
8 // 
9 // This library is distributed in the hope that it will be useful 
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of 
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
12 // Lesser General Public License for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public  
15 // License along with this library; if not, write to the Free Software 
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 //
18 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 //
20 #include "SALOME_ContainerManager.hxx"
21 #include "SALOME_NamingService.hxx"
22 #include "OpUtil.hxx"
23 #include <sys/types.h>
24 #ifndef WNT
25 #include <unistd.h>
26 #endif
27 #include <vector>
28 #include "Utils_CorbaException.hxx"
29 #include "Batch_Date.hxx"
30
31 #ifdef WITH_PACO_PARALLEL
32 #include "PaCO++.h"
33 #endif
34
35 #define TIME_OUT_TO_LAUNCH_CONT 21
36
37 using namespace std;
38
39 vector<Engines::Container_ptr> SALOME_ContainerManager::_batchLaunchedContainers;
40
41 vector<Engines::Container_ptr>::iterator SALOME_ContainerManager::_batchLaunchedContainersIter;
42
43 const char *SALOME_ContainerManager::_ContainerManagerNameInNS = 
44   "/ContainerManager";
45
46 //=============================================================================
47 /*! 
48  *  Constructor
49  *  \param orb
50  *  Define a CORBA single thread policy for the server, which avoid to deal
51  *  with non thread-safe usage like Change_Directory in SALOME naming service
52  */
53 //=============================================================================
54
55 SALOME_ContainerManager::SALOME_ContainerManager(CORBA::ORB_ptr orb, PortableServer::POA_var poa, SALOME_ResourcesManager *rm, SALOME_NamingService *ns)
56 {
57   MESSAGE("constructor");
58   _NS = ns;
59   _ResManager = rm;
60   _id=0;
61
62   PortableServer::POAManager_var pman = poa->the_POAManager();
63   _orb = CORBA::ORB::_duplicate(orb) ;
64   CORBA::PolicyList policies;
65   policies.length(1);
66   PortableServer::ThreadPolicy_var threadPol = 
67     poa->create_thread_policy(PortableServer::SINGLE_THREAD_MODEL);
68   policies[0] = PortableServer::ThreadPolicy::_duplicate(threadPol);
69
70   _poa = poa->create_POA("SThreadPOA",pman,policies);
71   threadPol->destroy();
72   PortableServer::ObjectId_var id = _poa->activate_object(this);
73   CORBA::Object_var obj = _poa->id_to_reference(id);
74   Engines::ContainerManager_var refContMan =
75     Engines::ContainerManager::_narrow(obj);
76
77   _NS->Register(refContMan,_ContainerManagerNameInNS);
78   MESSAGE("constructor end");
79 }
80
81 //=============================================================================
82 /*! 
83  * destructor
84  */
85 //=============================================================================
86
87 SALOME_ContainerManager::~SALOME_ContainerManager()
88 {
89   MESSAGE("destructor");
90 }
91
92 //=============================================================================
93 /*! CORBA method:
94  *  shutdown all the containers, then the ContainerManager servant
95  */
96 //=============================================================================
97
98 void SALOME_ContainerManager::Shutdown()
99 {
100   MESSAGE("Shutdown");
101   ShutdownContainers();
102   _NS->Destroy_Name(_ContainerManagerNameInNS);
103   PortableServer::ObjectId_var oid = _poa->servant_to_id(this);
104   _poa->deactivate_object(oid);
105   _remove_ref();
106 }
107
108 //=============================================================================
109 /*! CORBA Method:
110  *  Loop on all the containers listed in naming service, ask shutdown on each
111  */
112 //=============================================================================
113
114 void SALOME_ContainerManager::ShutdownContainers()
115 {
116   MESSAGE("ShutdownContainers");
117   bool isOK;
118   isOK = _NS->Change_Directory("/Containers");
119   if( isOK ){
120     vector<string> vec = _NS->list_directory_recurs();
121     list<string> lstCont;
122     for(vector<string>::iterator iter = vec.begin();iter!=vec.end();iter++){
123       SCRUTE((*iter));
124       CORBA::Object_var obj=_NS->Resolve((*iter).c_str());
125       Engines::Container_var cont=Engines::Container::_narrow(obj);
126       if(!CORBA::is_nil(cont)){
127         lstCont.push_back((*iter));
128       }
129     }
130     MESSAGE("Container list: ");
131     for(list<string>::iterator iter=lstCont.begin();iter!=lstCont.end();iter++){
132       SCRUTE((*iter));
133     }
134     for(list<string>::iterator iter=lstCont.begin();iter!=lstCont.end();iter++){
135       SCRUTE((*iter));
136       CORBA::Object_var obj=_NS->Resolve((*iter).c_str());
137       Engines::Container_var cont=Engines::Container::_narrow(obj);
138       if(!CORBA::is_nil(cont)){
139         MESSAGE("ShutdownContainers: " << (*iter));
140         cont->Shutdown();
141       }
142       else MESSAGE("ShutdownContainers: no container ref for " << (*iter));
143     }
144   }
145 }
146
147 //=============================================================================
148 /*! CORBA Method:
149  *  Find a suitable Container in a list of machines, or start one
150  *  \param params            Machine Parameters required for the container
151  *  \param possibleComputers list of machines usable for find or start
152  */
153 //=============================================================================
154
155 Engines::Container_ptr
156 SALOME_ContainerManager::
157 FindOrStartContainer(const Engines::MachineParameters& params,
158                      const Engines::MachineList& possibleComputers)
159 {
160   Engines::Container_ptr ret = FindContainer(params,possibleComputers);
161   if(!CORBA::is_nil(ret))
162     return ret;
163   MESSAGE("Container doesn't exist try to launch it ...");
164
165   return StartContainer(params,possibleComputers,Engines::P_FIRST);
166
167 }
168
169 //=============================================================================
170 /*! CORBA Method:
171  *  Start a suitable Container in a list of machines
172  *  \param params            Machine Parameters required for the container
173  *  \param possibleComputers list of machines usable for start
174  */
175 //=============================================================================
176
177 Engines::Container_ptr
178 SALOME_ContainerManager::
179 StartContainer(const Engines::MachineParameters& params,
180                const Engines::MachineList& possibleComputers,
181                Engines::ResPolicy policy)
182 {
183 #ifdef WITH_PACO_PARALLEL
184   std::string parallelLib(params.parallelLib);
185   if (parallelLib != "")
186     return FindOrStartParallelContainer(params, possibleComputers);
187 #endif
188   long id;
189   string containerNameInNS;
190   char idc[3*sizeof(long)];
191   Engines::Container_ptr ret = Engines::Container::_nil();
192
193   MESSAGE("SALOME_ContainerManager::StartContainer " <<
194           possibleComputers.length());
195
196   string theMachine;
197   try{
198     switch(policy){
199     case Engines::P_FIRST:
200       theMachine=_ResManager->FindFirst(possibleComputers);
201       break;
202     case Engines::P_CYCL:
203       theMachine=_ResManager->FindNext(possibleComputers);
204       break;
205     case Engines::P_BEST:
206       theMachine=_ResManager->FindBest(possibleComputers);
207       break;
208     }
209   }
210   catch( const SALOME_Exception &ex ){
211     MESSAGE(ex.what());
212     return Engines::Container::_nil();
213   }
214
215   MESSAGE("try to launch it on " << theMachine);
216
217   // Get Id for container: a parallel container registers in Naming Service
218   // on the machine where is process 0. ContainerManager does'nt know the name
219   // of this machine before the launch of the parallel container. So to get
220   // the IOR of the parallel container in Naming Service, ContainerManager
221   // gives a unique Id. The parallel container registers his name under
222   // /ContainerManager/Id directory in NamingService
223
224   id = GetIdForContainer();
225
226   string command;
227   if(theMachine==""){
228     MESSAGE("SALOME_ContainerManager::StartContainer : " <<
229             "no possible computer");
230     return Engines::Container::_nil();
231   }
232   else if(theMachine==GetHostname())
233     command=_ResManager->BuildCommandToLaunchLocalContainer(params,id);
234   else
235     command = _ResManager->BuildCommandToLaunchRemoteContainer(theMachine,params,id);
236
237   _ResManager->RmTmpFile();
238   int status=system(command.c_str());
239   if (status == -1){
240     MESSAGE("SALOME_LifeCycleCORBA::StartOrFindContainer rsh failed " <<
241             "(system command status -1)");
242     return Engines::Container::_nil();
243   }
244   else if (status == 217){
245     MESSAGE("SALOME_LifeCycleCORBA::StartOrFindContainer rsh failed " <<
246             "(system command status 217)");
247     return Engines::Container::_nil();
248   }
249   else{
250     int count=TIME_OUT_TO_LAUNCH_CONT;
251     MESSAGE("count = "<<count);
252     while ( CORBA::is_nil(ret) && count ){
253 #ifndef WNT
254       sleep( 1 ) ;
255 #else
256       Sleep(1000);
257 #endif
258       count-- ;
259       if ( count != 10 )
260         MESSAGE( count << ". Waiting for container on " << theMachine);
261
262       if(params.isMPI){
263         containerNameInNS = "/ContainerManager/id";
264         sprintf(idc,"%ld",id);
265         containerNameInNS += idc;
266       }
267       else
268         containerNameInNS = _NS->BuildContainerNameForNS(params,theMachine.c_str());
269
270       SCRUTE(containerNameInNS);
271       CORBA::Object_var obj = _NS->Resolve(containerNameInNS.c_str());
272       ret=Engines::Container::_narrow(obj);
273     }
274     
275     if ( CORBA::is_nil(ret) )
276       MESSAGE("SALOME_LifeCycleCORBA::StartOrFindContainer rsh failed");
277
278     return ret;
279   }
280 }
281
282 //=============================================================================
283 /*! CORBA Method:
284  *  Start a suitable Container in a list of machines
285  *  \param params            Machine Parameters required for the container
286  *  \param possibleComputers list of machines usable for start
287  */
288 //=============================================================================
289
290 Engines::Container_ptr
291 SALOME_ContainerManager::
292 StartContainer(const Engines::MachineParameters& params,
293                Engines::ResPolicy policy,
294                const Engines::CompoList& componentList)
295 {
296   Engines::MachineList_var possibleComputers = _ResManager->GetFittingResources(params,componentList);
297   return StartContainer(params,possibleComputers,policy);
298 }
299
300 #ifdef WITH_PACO_PARALLEL
301 //=============================================================================
302 /*! CORBA Method:
303  *  Find or Start a suitable PaCO++ Parallel Container in a list of machines.
304  *  \param params            Machine Parameters required for the container
305  *  \param possibleComputers list of machines usable for find or start
306  *
307  *  \return CORBA container reference.
308  */
309 //=============================================================================
310 Engines::Container_ptr
311 SALOME_ContainerManager::
312 FindOrStartParallelContainer(const Engines::MachineParameters& params_const,
313                              const Engines::MachineList& possibleComputers)
314 {
315   CORBA::Object_var obj;
316   Engines::Container_ptr ret = Engines::Container::_nil();
317   Engines::MachineParameters params(params_const);
318
319   // Step 1 : Try to find a suitable container
320   // Currently not as good as could be since
321   // we have to verified the number of nodes of the container
322   // if a user tell that.
323   ret = FindContainer(params, possibleComputers);
324
325   if(CORBA::is_nil(ret)) {
326     // Step 2 : Starting a new parallel container
327     INFOS("[FindOrStartParallelContainer] Starting a parallel container");
328     
329     // Step 2.1 : Choose a computer
330     string theMachine = _ResManager->FindFirst(possibleComputers);
331     if(theMachine == "") {
332       INFOS("[FindOrStartParallelContainer] !!!!!!!!!!!!!!!!!!!!!!!!!!");
333       INFOS("[FindOrStartParallelContainer] No possible computer found");
334       INFOS("[FindOrStartParallelContainer] !!!!!!!!!!!!!!!!!!!!!!!!!!");
335     }
336     else {
337       INFOS("[FindOrStartParallelContainer] on machine : " << theMachine);
338       string command;
339       if(theMachine == GetHostname()) {
340         // Step 3 : starting parallel container proxy
341         params.hostname = CORBA::string_dup(theMachine.c_str());
342         Engines::MachineParameters params_proxy(params);
343         command = _ResManager->BuildCommandToLaunchLocalParallelContainer("SALOME_ParallelContainerProxy", params_proxy, "xterm");
344         // LaunchParallelContainer uses this value to know if it launches the proxy or the nodes
345         params_proxy.nb_component_nodes = 0;
346         obj = LaunchParallelContainer(command, params_proxy, _NS->ContainerName(params));
347         ret = Engines::Container::_narrow(obj);
348
349         // Step 4 : starting parallel container nodes
350         command = _ResManager->BuildCommandToLaunchLocalParallelContainer("SALOME_ParallelContainerNode", params, "xterm");
351         string name = _NS->ContainerName(params) + "Node";
352         LaunchParallelContainer(command, params, name);
353
354         // Step 5 : connecting nodes and the proxy to actually create a parallel container
355         try {
356         for (int i = 0; i < params.nb_component_nodes; i++) {
357
358         char buffer [5];
359 #ifndef WNT
360           snprintf(buffer,5,"%d",i);
361 #else
362           _snprintf(buffer,5,"%d",i);
363 #endif
364         string name_cont = name + string(buffer);
365
366         string theNodeMachine(CORBA::string_dup(params.hostname));
367         string containerNameInNS = _NS->BuildContainerNameForNS(name_cont.c_str(),theNodeMachine.c_str());
368         int count = TIME_OUT_TO_LAUNCH_CONT;
369         obj = _NS->Resolve(containerNameInNS.c_str());
370         while (CORBA::is_nil(obj) && count) {
371           INFOS("[FindOrStartParallelContainer] CONNECTION FAILED !!!!!!!!!!!!!!!!!!!!!!!!");
372 #ifndef WNT
373           sleep(1) ;
374 #else
375           Sleep(1000);
376 #endif
377           count-- ;
378           obj = _NS->Resolve(containerNameInNS.c_str());
379         }
380
381         PaCO::InterfaceParallel_var node = PaCO::InterfaceParallel::_narrow(obj);
382         MESSAGE("[FindOrStartParallelContainer] Deploying node : " << name);
383         node->deploy(i);
384         }
385         }
386         catch(CORBA::SystemException& e)
387         {
388           INFOS("Caught CORBA::SystemException. : " << e);
389         }
390         catch(PortableServer::POA::ServantAlreadyActive&)
391         {
392           INFOS("Caught CORBA::ServantAlreadyActiveException");
393         }
394         catch(CORBA::Exception&)
395         {
396           INFOS("Caught CORBA::Exception.");
397         }
398         catch(std::exception& exc)
399         {
400           INFOS("Caught std::exception - "<<exc.what()); 
401         }
402         catch(...)
403         {
404           INFOS("Caught unknown exception.");
405         }
406         INFOS("[FindOrStartParallelContainer] node " << name << " deployed");
407       }
408
409       else {
410         INFOS("[FindOrStartParallelContainer] Currently parallel containers are launched only on the local host");
411       }
412     }
413   }
414   return ret;
415 }
416 #else
417 //=============================================================================
418 /*! CORBA Method:
419  *  Find or Start a suitable PaCO++ Parallel Container in a list of machines.
420  *  \param params            Machine Parameters required for the container
421  *  \param possibleComputers list of machines usable for find or start
422  *
423  *  \return CORBA container reference.
424  */
425 //=============================================================================
426 Engines::Container_ptr
427 SALOME_ContainerManager::
428 FindOrStartParallelContainer(const Engines::MachineParameters& params,
429                              const Engines::MachineList& possibleComputers)
430 {
431   Engines::Container_ptr ret = Engines::Container::_nil();
432   INFOS("[FindOrStartParallelContainer] is disabled !");
433   INFOS("[FindOrStartParallelContainer] recompile SALOME Kernel to enable parallel extension");
434   return ret;
435 }
436 #endif
437
438 //=============================================================================
439 /*! CORBA Method:
440  *  Give a suitable Container in a list of machines
441  *  \param params            Machine Parameters required for the container
442  *  \param possibleComputers list of machines usable for start
443  */
444 //=============================================================================
445
446 Engines::Container_ptr
447 SALOME_ContainerManager::
448 GiveContainer(const Engines::MachineParameters& params,
449                Engines::ResPolicy policy,
450                const Engines::CompoList& componentList)
451 {
452   char *valenv=getenv("SALOME_BATCH");
453   if(valenv)
454     if (strcmp(valenv,"1")==0)
455       {
456         if(_batchLaunchedContainers.empty())
457           fillBatchLaunchedContainers();
458         return *(_batchLaunchedContainersIter++);
459       }
460   return StartContainer(params,policy,componentList);
461 }
462
463 //=============================================================================
464 /*! 
465  * 
466  */
467 //=============================================================================
468
469 Engines::Container_ptr
470 SALOME_ContainerManager::
471 FindContainer(const Engines::MachineParameters& params,
472               const char *theMachine)
473 {
474   string containerNameInNS(_NS->BuildContainerNameForNS(params,theMachine));
475   CORBA::Object_var obj = _NS->Resolve(containerNameInNS.c_str());
476   if( !CORBA::is_nil(obj) )
477     return Engines::Container::_narrow(obj);
478   else
479     return Engines::Container::_nil();
480 }
481
482 //=============================================================================
483 /*! 
484  * 
485  */
486 //=============================================================================
487
488 Engines::Container_ptr
489 SALOME_ContainerManager::
490 FindContainer(const Engines::MachineParameters& params,
491               const Engines::MachineList& possibleComputers)
492 {
493   MESSAGE("FindContainer "<<possibleComputers.length());
494   for(unsigned int i=0;i<possibleComputers.length();i++)
495     {
496       MESSAGE("FindContainer possible " << possibleComputers[i]);
497       Engines::Container_ptr cont = FindContainer(params,possibleComputers[i]);
498       if( !CORBA::is_nil(cont) )
499         return cont;
500     }
501   MESSAGE("FindContainer: not found");
502   return Engines::Container::_nil();
503 }
504
505 //=============================================================================
506 /*! This method launches the parallel container.
507  *  It will may be placed on the ressources manager.
508  *
509  * \param command to launch
510  * \param container's parameters
511  * \param name of the container
512  *
513  * \return CORBA container reference
514  */
515 //=============================================================================
516 CORBA::Object_ptr 
517 SALOME_ContainerManager::LaunchParallelContainer(const std::string& command, 
518                                                  const Engines::MachineParameters& params,
519                                                  const std::string& name)
520 {
521   CORBA::Object_ptr obj = CORBA::Object::_nil();
522   string containerNameInNS;
523
524   if (params.nb_component_nodes == 0) {
525     INFOS("[LaunchParallelContainer] launching the proxy of the parallel container");
526     int status = system(command.c_str());
527     if (status == -1) {
528       INFOS("[LaunchParallelContainer] failed : system command status -1");
529     }
530     else if (status == 217) {
531       INFOS("[LaunchParallelContainer] failed : system command status 217");
532     }
533
534     int count = TIME_OUT_TO_LAUNCH_CONT;
535     string theMachine(CORBA::string_dup(params.hostname));
536     containerNameInNS = _NS->BuildContainerNameForNS((char*) name.c_str(),theMachine.c_str());
537
538     INFOS("[LaunchContainer]  Waiting for Parallel Container proxy on " << theMachine);
539     while (CORBA::is_nil(obj) && count) {
540 #ifndef WNT
541       sleep(1) ;
542 #else
543       Sleep(1000);
544 #endif
545       count-- ;
546       obj = _NS->Resolve(containerNameInNS.c_str());
547     }
548   }
549   else {
550     INFOS("[LaunchParallelContainer] launching the nodes of the parallel container");
551     int status = system(command.c_str());
552     if (status == -1) {
553       INFOS("[LaunchParallelContainer] failed : system command status -1");
554     }
555     else if (status == 217) {
556       INFOS("[LaunchParallelContainer] failed : system command status 217");
557     }
558     // We are waiting all the nodes
559     for (int i = 0; i < params.nb_component_nodes; i++) {
560       obj = CORBA::Object::_nil();
561       int count = TIME_OUT_TO_LAUNCH_CONT;
562
563       // Name of the node
564       char buffer [5];
565 #ifndef WNT
566       snprintf(buffer,5,"%d",i);
567 #else
568       _snprintf(buffer,5,"%d",i);
569 #endif
570
571       string name_cont = name + string(buffer);
572
573       // I don't like this...
574       string theMachine(CORBA::string_dup(params.hostname));
575       containerNameInNS = _NS->BuildContainerNameForNS((char*) name_cont.c_str(),theMachine.c_str());
576       cerr << "[LaunchContainer]  Waiting for Parllel Container node " << containerNameInNS << " on " << theMachine << endl;
577       while (CORBA::is_nil(obj) && count) {
578 #ifndef WNT
579         sleep(1) ;
580 #else
581         Sleep(1000);
582 #endif
583         count-- ;
584         obj = _NS->Resolve(containerNameInNS.c_str());
585       }
586     }
587   }
588
589   if ( CORBA::is_nil(obj) ) {
590     INFOS("[LaunchParallelContainer] failed");
591   }
592   return obj;
593 }
594
595 //=============================================================================
596 /*! 
597  * Get Id for container: a parallel container registers in Naming Service
598  * on the machine where is process 0. ContainerManager does'nt know the name
599  * of this machine before the launch of the parallel container. So to get
600  * the IOR of the parallel container in Naming Service, ContainerManager
601  * gives a unique Id. The parallel container registers his name under
602  * /ContainerManager/Id directory in NamingService
603  */
604 //=============================================================================
605
606
607 long SALOME_ContainerManager::GetIdForContainer(void)
608 {
609   _id++;
610   return _id;
611 }
612
613 void SALOME_ContainerManager::fillBatchLaunchedContainers()
614 {
615   _batchLaunchedContainers.clear();
616   _NS->Change_Directory("/Containers");
617   vector<string> vec = _NS->list_directory_recurs();
618   for(vector<string>::iterator iter = vec.begin();iter!=vec.end();iter++){
619     CORBA::Object_var obj=_NS->Resolve((*iter).c_str());
620     Engines::Container_ptr cont=Engines::Container::_narrow(obj);
621     if(!CORBA::is_nil(cont)){
622       _batchLaunchedContainers.push_back(cont);
623     }
624   }
625   _batchLaunchedContainersIter=_batchLaunchedContainers.begin();
626 }