Salome HOME
Corrected implementation of MESSAGE functionality on Windows
[modules/kernel.git] / src / Container / SALOME_ContainerManager.cxx
1 // Copyright (C) 2005  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
2 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
3 // 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either 
7 // version 2.1 of the License.
8 // 
9 // This library is distributed in the hope that it will be useful 
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of 
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
12 // Lesser General Public License for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public  
15 // License along with this library; if not, write to the Free Software 
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17 //
18 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 //
20 #include "SALOME_ContainerManager.hxx"
21 #include "SALOME_NamingService.hxx"
22 #include "OpUtil.hxx"
23 #include <sys/types.h>
24 #ifndef WNT
25 #include <unistd.h>
26 #endif
27 #include <vector>
28 #include "Utils_CorbaException.hxx"
29
30 #ifdef WITH_PACO_PARALLEL
31 #include "PaCO++.h"
32 #endif
33
34 #define TIME_OUT_TO_LAUNCH_CONT 21
35
36 using namespace std;
37
38 const char *SALOME_ContainerManager::_ContainerManagerNameInNS = 
39   "/ContainerManager";
40
41 //=============================================================================
42 /*! 
43  *  Constructor
44  *  \param orb
45  *  Define a CORBA single thread policy for the server, which avoid to deal
46  *  with non thread-safe usage like Change_Directory in SALOME naming service
47  */
48 //=============================================================================
49
50 SALOME_ContainerManager::SALOME_ContainerManager(CORBA::ORB_ptr orb)
51 {
52   MESSAGE("constructor");
53   _NS = new SALOME_NamingService(orb);
54   _ResManager = new SALOME_ResourcesManager(orb);
55   _id=0;
56   PortableServer::POA_var root_poa = PortableServer::POA::_the_root_poa();
57   PortableServer::POAManager_var pman = root_poa->the_POAManager();
58   PortableServer::POA_var my_poa;
59
60   CORBA::PolicyList policies;
61   policies.length(1);
62   PortableServer::ThreadPolicy_var threadPol = 
63     root_poa->create_thread_policy(PortableServer::SINGLE_THREAD_MODEL);
64   policies[0] = PortableServer::ThreadPolicy::_duplicate(threadPol);
65
66   my_poa = 
67     root_poa->create_POA("SThreadPOA",pman,policies);
68   threadPol->destroy();
69   PortableServer::ObjectId_var id = my_poa->activate_object(this);
70   CORBA::Object_var obj = my_poa->id_to_reference(id);
71   Engines::ContainerManager_var refContMan =
72     Engines::ContainerManager::_narrow(obj);
73
74   _NS->Register(refContMan,_ContainerManagerNameInNS);
75   MESSAGE("constructor end");
76 }
77
78 //=============================================================================
79 /*! 
80  * destructor
81  */
82 //=============================================================================
83
84 SALOME_ContainerManager::~SALOME_ContainerManager()
85 {
86   MESSAGE("destructor");
87   delete _NS;
88   delete _ResManager;
89 }
90
91 //=============================================================================
92 /*! CORBA method:
93  *  shutdown all the containers, then the ContainerManager servant
94  */
95 //=============================================================================
96
97 void SALOME_ContainerManager::Shutdown()
98 {
99   MESSAGE("Shutdown");
100   ShutdownContainers();
101   PortableServer::ObjectId_var oid = _default_POA()->servant_to_id(this);
102   _default_POA()->deactivate_object(oid);
103   _remove_ref();
104   
105 }
106
107 //=============================================================================
108 /*! CORBA Method:
109  *  Loop on all the containers listed in naming service, ask shutdown on each
110  */
111 //=============================================================================
112
113 void SALOME_ContainerManager::ShutdownContainers()
114 {
115   MESSAGE("ShutdownContainers");
116   _NS->Change_Directory("/Containers");
117   vector<string> vec = _NS->list_directory_recurs();
118   list<string> lstCont;
119   for(vector<string>::iterator iter = vec.begin();iter!=vec.end();iter++)
120     {
121       SCRUTE((*iter));
122       CORBA::Object_var obj=_NS->Resolve((*iter).c_str());
123       Engines::Container_var cont=Engines::Container::_narrow(obj);
124       if(!CORBA::is_nil(cont))
125         {
126           lstCont.push_back((*iter));
127         }
128     }
129   MESSAGE("Container list: ");
130   for(list<string>::iterator iter=lstCont.begin();iter!=lstCont.end();iter++)
131     {
132       SCRUTE((*iter));
133     }
134   for(list<string>::iterator iter=lstCont.begin();iter!=lstCont.end();iter++)
135     {
136       SCRUTE((*iter));
137       CORBA::Object_var obj=_NS->Resolve((*iter).c_str());
138       Engines::Container_var cont=Engines::Container::_narrow(obj);
139       if(!CORBA::is_nil(cont))
140         {
141           MESSAGE("ShutdownContainers: " << (*iter));
142           cont->Shutdown();
143         }
144       else MESSAGE("ShutdownContainers: no container ref for " << (*iter));
145     }
146 }
147
148 //=============================================================================
149 /*! CORBA Method:
150  *  Find a suitable Container in a list of machines, or start one
151  *  \param params            Machine Parameters required for the container
152  *  \param possibleComputers list of machines usable for find or start
153  */
154 //=============================================================================
155
156 Engines::Container_ptr
157 SALOME_ContainerManager::
158 FindOrStartContainer(const Engines::MachineParameters& params,
159                      const Engines::MachineList& possibleComputers)
160 {
161   long id;
162   string containerNameInNS;
163   char idc[3*sizeof(long)];
164
165   Engines::Container_ptr ret = FindContainer(params,possibleComputers);
166   if(!CORBA::is_nil(ret))
167     return ret;
168   MESSAGE("Container doesn't exist try to launch it ...");
169
170   return StartContainer(params,possibleComputers,Engines::P_FIRST);
171
172 }
173
174 //=============================================================================
175 /*! CORBA Method:
176  *  Start a suitable Container in a list of machines
177  *  \param params            Machine Parameters required for the container
178  *  \param possibleComputers list of machines usable for start
179  */
180 //=============================================================================
181
182 Engines::Container_ptr
183 SALOME_ContainerManager::
184 StartContainer(const Engines::MachineParameters& params,
185                const Engines::MachineList& possibleComputers,
186                Engines::ResPolicy policy)
187 {
188   long id;
189   string containerNameInNS;
190   char idc[3*sizeof(long)];
191   Engines::Container_ptr ret = Engines::Container::_nil();
192
193   MESSAGE("SALOME_ContainerManager::StartContainer " <<
194           possibleComputers.length());
195
196   string theMachine;
197   try{
198     switch(policy){
199     case Engines::P_FIRST:
200       theMachine=_ResManager->FindFirst(possibleComputers);
201       break;
202     case Engines::P_CYCL:
203       theMachine=_ResManager->FindNext(possibleComputers);
204       break;
205     case Engines::P_BEST:
206       theMachine=_ResManager->FindBest(possibleComputers);
207       break;
208     }
209   }
210   catch( const SALOME_Exception &ex ){
211     MESSAGE(ex.what());
212     return Engines::Container::_nil();
213   }
214
215   MESSAGE("try to launch it on " << theMachine);
216
217   // Get Id for container: a parallel container registers in Naming Service
218   // on the machine where is process 0. ContainerManager does'nt know the name
219   // of this machine before the launch of the parallel container. So to get
220   // the IOR of the parallel container in Naming Service, ContainerManager
221   // gives a unique Id. The parallel container registers his name under
222   // /ContainerManager/Id directory in NamingService
223
224   id = GetIdForContainer();
225
226   string command;
227   if(theMachine==""){
228     MESSAGE("SALOME_ContainerManager::StartContainer : " <<
229             "no possible computer");
230     return Engines::Container::_nil();
231   }
232   else if(theMachine==GetHostname())
233     command=_ResManager->BuildCommandToLaunchLocalContainer(params,id);
234   else
235     command = _ResManager->BuildCommandToLaunchRemoteContainer(theMachine,params,id);
236
237   _ResManager->RmTmpFile();
238   int status=system(command.c_str());
239   if (status == -1){
240     MESSAGE("SALOME_LifeCycleCORBA::StartOrFindContainer rsh failed " <<
241             "(system command status -1)");
242     return Engines::Container::_nil();
243   }
244   else if (status == 217){
245     MESSAGE("SALOME_LifeCycleCORBA::StartOrFindContainer rsh failed " <<
246             "(system command status 217)");
247     return Engines::Container::_nil();
248   }
249   else{
250     int count=TIME_OUT_TO_LAUNCH_CONT;
251     MESSAGE("count = "<<count);
252     while ( CORBA::is_nil(ret) && count ){
253 #ifndef WNT
254       sleep( 1 ) ;
255 #else
256       Sleep(1000);
257 #endif
258       count-- ;
259       if ( count != 10 )
260         MESSAGE( count << ". Waiting for container on " << theMachine);
261
262       if(params.isMPI){
263         containerNameInNS = "/ContainerManager/id";
264         sprintf(idc,"%ld",id);
265         containerNameInNS += idc;
266       }
267       else
268         containerNameInNS = _NS->BuildContainerNameForNS(params,theMachine.c_str());
269
270       SCRUTE(containerNameInNS);
271       CORBA::Object_var obj = _NS->Resolve(containerNameInNS.c_str());
272       ret=Engines::Container::_narrow(obj);
273     }
274     
275     if ( CORBA::is_nil(ret) )
276       MESSAGE("SALOME_LifeCycleCORBA::StartOrFindContainer rsh failed");
277
278     return ret;
279   }
280 }
281
282 //=============================================================================
283 /*! CORBA Method:
284  *  Start a suitable Container in a list of machines
285  *  \param params            Machine Parameters required for the container
286  *  \param possibleComputers list of machines usable for start
287  */
288 //=============================================================================
289
290 Engines::Container_ptr
291 SALOME_ContainerManager::
292 StartContainer(const Engines::MachineParameters& params,
293                Engines::ResPolicy policy)
294 {
295   Engines::MachineList_var possibleComputers = GetFittingResources(params,"");
296   return StartContainer(params,possibleComputers,policy);
297 }
298
299 #ifdef WITH_PACO_PARALLEL
300 //=============================================================================
301 /*! CORBA Method:
302  *  Find or Start a suitable PaCO++ Parallel Container in a list of machines.
303  *  \param params            Machine Parameters required for the container
304  *  \param possibleComputers list of machines usable for find or start
305  *
306  *  \return CORBA container reference.
307  */
308 //=============================================================================
309 Engines::Container_ptr
310 SALOME_ContainerManager::
311 FindOrStartParallelContainer(const Engines::MachineParameters& params_const,
312                              const Engines::MachineList& possibleComputers)
313 {
314   CORBA::Object_var obj;
315   Engines::Container_ptr ret = Engines::Container::_nil();
316   Engines::MachineParameters params(params_const);
317
318   // Step 1 : Try to find a suitable container
319   // Currently not as good as could be since
320   // we have to verified the number of nodes of the container
321   // if a user tell that.
322   ret = FindContainer(params, possibleComputers);
323
324   if(CORBA::is_nil(ret)) {
325     // Step 2 : Starting a new parallel container
326     INFOS("[FindOrStartParallelContainer] Starting a parallel container");
327     
328     // Step 2.1 : Choose a computer
329     string theMachine = _ResManager->FindBest(possibleComputers);
330     if(theMachine == "") {
331       INFOS("[FindOrStartParallelContainer] !!!!!!!!!!!!!!!!!!!!!!!!!!");
332       INFOS("[FindOrStartParallelContainer] No possible computer found");
333       INFOS("[FindOrStartParallelContainer] !!!!!!!!!!!!!!!!!!!!!!!!!!");
334     }
335     else {
336       INFOS("[FindOrStartParallelContainer] on machine : " << theMachine);
337       string command;
338       if(theMachine == GetHostname()) {
339         // Step 3 : starting parallel container proxy
340         params.hostname = CORBA::string_dup(theMachine.c_str());
341         Engines::MachineParameters params_proxy(params);
342         command = _ResManager->BuildCommandToLaunchLocalParallelContainer("SALOME_ParallelContainerProxy", params_proxy, "xterm");
343         // LaunchParallelContainer uses this value to know if it launches the proxy or the nodes
344         params_proxy.nb_component_nodes = 0;
345         obj = LaunchParallelContainer(command, params_proxy, _NS->ContainerName(params));
346         ret = Engines::Container::_narrow(obj);
347
348         // Step 4 : starting parallel container nodes
349         command = _ResManager->BuildCommandToLaunchLocalParallelContainer("SALOME_ParallelContainerNode", params, "xterm");
350         string name = _NS->ContainerName(params) + "Node";
351         LaunchParallelContainer(command, params, name);
352
353         // Step 5 : connecting nodes and the proxy to actually create a parallel container
354         try {
355         for (int i = 0; i < params.nb_component_nodes; i++) {
356
357         char buffer [5];
358 #ifndef WNT
359           snprintf(buffer,5,"%d",i);
360 #else
361           _snprintf(buffer,5,"%d",i);
362 #endif
363         string name_cont = name + string(buffer);
364
365         string theNodeMachine(CORBA::string_dup(params.hostname));
366         string containerNameInNS = _NS->BuildContainerNameForNS(name_cont.c_str(),theNodeMachine.c_str());
367         int count = TIME_OUT_TO_LAUNCH_CONT;
368         obj = _NS->Resolve(containerNameInNS.c_str());
369         while (CORBA::is_nil(obj) && count) {
370           INFOS("[FindOrStartParallelContainer] CONNECTION FAILED !!!!!!!!!!!!!!!!!!!!!!!!");
371 #ifndef WNT
372           sleep(1) ;
373 #else
374           Sleep(1000);
375 #endif
376           count-- ;
377           obj = _NS->Resolve(containerNameInNS.c_str());
378         }
379
380         PaCO::InterfaceParallel_var node = PaCO::InterfaceParallel::_narrow(obj);
381         MESSAGE("[FindOrStartParallelContainer] Deploying node : " << name);
382         node->deploy(i);
383         }
384         }
385         catch(CORBA::SystemException& e)
386         {
387           INFOS("Caught CORBA::SystemException. : " << e);
388         }
389         catch(PortableServer::POA::ServantAlreadyActive&)
390         {
391           INFOS("Caught CORBA::ServantAlreadyActiveException");
392         }
393         catch(CORBA::Exception&)
394         {
395           INFOS("Caught CORBA::Exception.");
396         }
397         catch(std::exception& exc)
398         {
399           INFOS("Caught std::exception - "<<exc.what()); 
400         }
401         catch(...)
402         {
403           INFOS("Caught unknown exception.");
404         }
405         INFOS("[FindOrStartParallelContainer] node " << name << " deployed");
406       }
407
408       else {
409         INFOS("[FindOrStartParallelContainer] Currently parallel containers are launched only on the local host");
410       }
411     }
412   }
413   return ret;
414 }
415 #else
416 //=============================================================================
417 /*! CORBA Method:
418  *  Find or Start a suitable PaCO++ Parallel Container in a list of machines.
419  *  \param params            Machine Parameters required for the container
420  *  \param possibleComputers list of machines usable for find or start
421  *
422  *  \return CORBA container reference.
423  */
424 //=============================================================================
425 Engines::Container_ptr
426 SALOME_ContainerManager::
427 FindOrStartParallelContainer(const Engines::MachineParameters& params,
428                              const Engines::MachineList& possibleComputers)
429 {
430   Engines::Container_ptr ret = Engines::Container::_nil();
431   INFOS("[FindOrStartParallelContainer] is disabled !");
432   INFOS("[FindOrStartParallelContainer] recompile SALOME Kernel to enable parallel extension");
433   return ret;
434 }
435 #endif
436
437 //=============================================================================
438 /*! 
439  * 
440  */
441 //=============================================================================
442
443 Engines::MachineList *
444 SALOME_ContainerManager::
445 GetFittingResources(const Engines::MachineParameters& params,
446                     const char *componentName)
447 {
448   MESSAGE("SALOME_ContainerManager::GetFittingResources");
449   Engines::MachineList *ret=new Engines::MachineList;
450   vector<string> vec;
451   try
452     {
453       vec = _ResManager->GetFittingResources(params,componentName);
454     }
455   catch(const SALOME_Exception &ex)
456     {
457       INFOS("Caught exception.");
458       THROW_SALOME_CORBA_EXCEPTION(ex.what(),SALOME::BAD_PARAM);
459       //return ret;
460     }
461
462   //  MESSAGE("Machine list length "<<vec.size());
463   ret->length(vec.size());
464   for(unsigned int i=0;i<vec.size();i++)
465     {
466       (*ret)[i]=(vec[i]).c_str();
467     }
468   return ret;
469 }
470
471 //=============================================================================
472 /*! 
473  * 
474  */
475 //=============================================================================
476
477 char*
478 SALOME_ContainerManager::
479 FindFirst(const Engines::MachineList& possibleComputers)
480 {
481   string theMachine=_ResManager->FindFirst(possibleComputers);
482   return CORBA::string_dup(theMachine.c_str());
483 }
484
485 //=============================================================================
486 /*! 
487  * 
488  */
489 //=============================================================================
490
491 Engines::Container_ptr
492 SALOME_ContainerManager::
493 FindContainer(const Engines::MachineParameters& params,
494               const char *theMachine)
495 {
496   string containerNameInNS(_NS->BuildContainerNameForNS(params,theMachine));
497   CORBA::Object_var obj = _NS->Resolve(containerNameInNS.c_str());
498   if( !CORBA::is_nil(obj) )
499     return Engines::Container::_narrow(obj);
500   else
501     return Engines::Container::_nil();
502 }
503
504 //=============================================================================
505 /*! 
506  * 
507  */
508 //=============================================================================
509
510 Engines::Container_ptr
511 SALOME_ContainerManager::
512 FindContainer(const Engines::MachineParameters& params,
513               const Engines::MachineList& possibleComputers)
514 {
515   MESSAGE("FindContainer "<<possibleComputers.length());
516   for(unsigned int i=0;i<possibleComputers.length();i++)
517     {
518       MESSAGE("FindContainer possible " << possibleComputers[i]);
519       Engines::Container_ptr cont = FindContainer(params,possibleComputers[i]);
520       if( !CORBA::is_nil(cont) )
521         return cont;
522     }
523   MESSAGE("FindContainer: not found");
524   return Engines::Container::_nil();
525 }
526
527 //=============================================================================
528 /*! This method launches the parallel container.
529  *  It will may be placed on the ressources manager.
530  *
531  * \param command to launch
532  * \param container's parameters
533  * \param name of the container
534  *
535  * \return CORBA container reference
536  */
537 //=============================================================================
538 CORBA::Object_ptr 
539 SALOME_ContainerManager::LaunchParallelContainer(const std::string& command, 
540                                                  const Engines::MachineParameters& params,
541                                                  const std::string& name)
542 {
543   CORBA::Object_ptr obj = CORBA::Object::_nil();
544   string containerNameInNS;
545
546   if (params.nb_component_nodes == 0) {
547     INFOS("[LaunchParallelContainer] launching the proxy of the parallel container");
548     int status = system(command.c_str());
549     if (status == -1) {
550       INFOS("[LaunchParallelContainer] failed : system command status -1");
551     }
552     else if (status == 217) {
553       INFOS("[LaunchParallelContainer] failed : system command status 217");
554     }
555
556     int count = TIME_OUT_TO_LAUNCH_CONT;
557     string theMachine(CORBA::string_dup(params.hostname));
558     containerNameInNS = _NS->BuildContainerNameForNS((char*) name.c_str(),theMachine.c_str());
559
560     INFOS("[LaunchContainer]  Waiting for Parallel Container proxy on " << theMachine);
561     while (CORBA::is_nil(obj) && count) {
562 #ifndef WNT
563       sleep(1) ;
564 #else
565       Sleep(1000);
566 #endif
567       count-- ;
568       obj = _NS->Resolve(containerNameInNS.c_str());
569     }
570   }
571   else {
572     INFOS("[LaunchParallelContainer] launching the nodes of the parallel container");
573     int status = system(command.c_str());
574     if (status == -1) {
575       INFOS("[LaunchParallelContainer] failed : system command status -1");
576     }
577     else if (status == 217) {
578       INFOS("[LaunchParallelContainer] failed : system command status 217");
579     }
580     // We are waiting all the nodes
581     for (int i = 0; i < params.nb_component_nodes; i++) {
582       obj = CORBA::Object::_nil();
583       int count = TIME_OUT_TO_LAUNCH_CONT;
584
585       // Name of the node
586       char buffer [5];
587 #ifndef WNT
588       snprintf(buffer,5,"%d",i);
589 #else
590       _snprintf(buffer,5,"%d",i);
591 #endif
592
593       string name_cont = name + string(buffer);
594
595       // I don't like this...
596       string theMachine(CORBA::string_dup(params.hostname));
597       containerNameInNS = _NS->BuildContainerNameForNS((char*) name_cont.c_str(),theMachine.c_str());
598       cerr << "[LaunchContainer]  Waiting for Parllel Container node " << containerNameInNS << " on " << theMachine << endl;
599       while (CORBA::is_nil(obj) && count) {
600 #ifndef WNT
601         sleep(1) ;
602 #else
603         Sleep(1000);
604 #endif
605         count-- ;
606         obj = _NS->Resolve(containerNameInNS.c_str());
607       }
608     }
609   }
610
611   if ( CORBA::is_nil(obj) ) {
612     INFOS("[LaunchParallelContainer] failed");
613   }
614   return obj;
615 }
616
617 //=============================================================================
618 /*! 
619  * Get Id for container: a parallel container registers in Naming Service
620  * on the machine where is process 0. ContainerManager does'nt know the name
621  * of this machine before the launch of the parallel container. So to get
622  * the IOR of the parallel container in Naming Service, ContainerManager
623  * gives a unique Id. The parallel container registers his name under
624  * /ContainerManager/Id directory in NamingService
625  */
626 //=============================================================================
627
628
629 long SALOME_ContainerManager::GetIdForContainer(void)
630 {
631   _id++;
632   return _id;
633 }
634