Salome HOME
Merge branch 'agy/ParallelContainerLaunch'
[modules/kernel.git] / src / ResourcesManager / SALOME_ResourcesManager.cxx
1 // Copyright (C) 2007-2014  CEA/DEN, EDF R&D, OPEN CASCADE
2 //
3 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
5 //
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
10 //
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // Lesser General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
19 //
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 //
22
23 #include "SALOME_ResourcesManager.hxx" 
24 #include "Utils_ExceptHandlers.hxx"
25 #include "Utils_CorbaException.hxx"
26 #include "OpUtil.hxx"
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #ifndef WIN32
31 #include <unistd.h>
32 #else
33 #include <io.h>
34 #include <process.h>
35 #endif
36 #include <fstream>
37 #include <iostream>
38 #include <sstream>
39 #include <string.h>
40 #include <map>
41 #include <list>
42
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include "utilities.h"
46
47 #define MAX_SIZE_FOR_HOSTNAME 256;
48
49 using namespace std;
50
51 const char *SALOME_ResourcesManager::_ResourcesManagerNameInNS = "/ResourcesManager";
52
53 //=============================================================================
54 /*!
55  * just for test
56  */ 
57 //=============================================================================
58
59 SALOME_ResourcesManager::SALOME_ResourcesManager(CORBA::ORB_ptr orb, PortableServer::POA_var poa, SALOME_NamingService *ns, const char *xmlFilePath) : _rm(xmlFilePath)
60 {
61   MESSAGE("SALOME_ResourcesManager constructor");
62   _NS = ns;
63   _orb = CORBA::ORB::_duplicate(orb) ;
64   //
65   PortableServer::POAManager_var pman = poa->the_POAManager();
66   CORBA::PolicyList policies;
67   policies.length(1);
68   PortableServer::ThreadPolicy_var threadPol(poa->create_thread_policy(PortableServer::SINGLE_THREAD_MODEL));
69   policies[0] = PortableServer::ThreadPolicy::_duplicate(threadPol);
70   _poa = poa->create_POA("SingleThreadPOA",pman,policies);
71   threadPol->destroy();
72   //
73   PortableServer::ObjectId_var id = _poa->activate_object(this);
74   CORBA::Object_var obj = _poa->id_to_reference(id);
75   Engines::ResourcesManager_var refContMan = Engines::ResourcesManager::_narrow(obj);
76   _NS->Register(refContMan,_ResourcesManagerNameInNS);
77   MESSAGE("SALOME_ResourcesManager constructor end");
78 }
79
80 //=============================================================================
81 /*!
82  *  Standard constructor, parse resource file.
83  *  - if ${APPLI} exists in environment,
84  *    look for ${HOME}/${APPLI}/CatalogResources.xml
85  *  - else look for default:
86  *    ${KERNEL_ROOT_DIR}/share/salome/resources/kernel/CatalogResources.xml
87  *  - parse XML resource file.
88  */ 
89 //=============================================================================
90
91 SALOME_ResourcesManager::SALOME_ResourcesManager(CORBA::ORB_ptr orb, 
92                                                  PortableServer::POA_var poa, 
93                                                  SALOME_NamingService *ns) : _rm()
94 {
95   MESSAGE("SALOME_ResourcesManager constructor");
96   _NS = ns;
97   _orb = CORBA::ORB::_duplicate(orb) ;
98   //
99   PortableServer::POAManager_var pman = poa->the_POAManager();
100   CORBA::PolicyList policies;
101   policies.length(1);
102   PortableServer::ThreadPolicy_var threadPol(poa->create_thread_policy(PortableServer::SINGLE_THREAD_MODEL));
103   policies[0] = PortableServer::ThreadPolicy::_duplicate(threadPol);
104   _poa = poa->create_POA("SingleThreadPOA",pman,policies);
105   threadPol->destroy();
106   //
107   PortableServer::ObjectId_var id = _poa->activate_object(this);
108   CORBA::Object_var obj = _poa->id_to_reference(id);
109   Engines::ResourcesManager_var refContMan = Engines::ResourcesManager::_narrow(obj);
110   _NS->Register(refContMan,_ResourcesManagerNameInNS);
111
112   MESSAGE("SALOME_ResourcesManager constructor end");
113 }
114
115 //=============================================================================
116 /*!
117  *  Standard Destructor
118  */ 
119 //=============================================================================
120
121 SALOME_ResourcesManager::~SALOME_ResourcesManager()
122 {
123   MESSAGE("SALOME_ResourcesManager destructor");
124 }
125
126
127 //=============================================================================
128 /*! CORBA method:
129  *  shutdown all the containers, then the ContainerManager servant
130  */
131 //=============================================================================
132
133 void SALOME_ResourcesManager::Shutdown()
134 {
135   MESSAGE("Shutdown");
136   _NS->Destroy_Name(_ResourcesManagerNameInNS);
137   PortableServer::ObjectId_var oid = _poa->servant_to_id(this);
138   _poa->deactivate_object(oid);
139 }
140
141 //=============================================================================
142 //! get the name of resources fitting the specified constraints (params)
143 /*!
144  *  If hostname specified, check it is local or known in resources catalog.
145  *
146  *  Else
147  *  - select first machines with corresponding OS (all machines if
148  *    parameter OS empty),
149  *  - then select the sublist of machines on which the component is known
150  *    (if the result is empty, that probably means that the inventory of
151  *    components is probably not done, so give complete list from previous step)
152  */ 
153 //=============================================================================
154
155 Engines::ResourceList *
156 SALOME_ResourcesManager::GetFittingResources(const Engines::ResourceParameters& params)
157 {
158   MESSAGE("ResourcesManager::GetFittingResources");
159   Engines::ResourceList * ret = new Engines::ResourceList;
160
161   // CORBA -> C++
162   resourceParams p;
163   p.name = params.name;
164   p.hostname = params.hostname;
165   p.can_launch_batch_jobs = params.can_launch_batch_jobs;
166   p.can_run_containers = params.can_run_containers;
167   p.OS = params.OS;
168   p.nb_proc = params.nb_proc;
169   p.nb_node = params.nb_node;
170   p.nb_proc_per_node = params.nb_proc_per_node;
171   p.cpu_clock = params.cpu_clock;
172   p.mem_mb = params.mem_mb;
173   for(unsigned int i=0; i<params.componentList.length(); i++)
174     p.componentList.push_back(std::string(params.componentList[i]));
175   for(unsigned int i=0; i<params.resList.length(); i++)
176     p.resourceList.push_back(std::string(params.resList[i]));
177   
178   try
179   {
180     // Call C++ ResourceManager
181     std::vector <std::string> vec = _rm.GetFittingResources(p);
182
183     // C++ -> CORBA
184     ret->length(vec.size());
185     for(unsigned int i=0;i<vec.size();i++)
186       (*ret)[i] = (vec[i]).c_str();
187   }
188   catch(const ResourcesException &ex)
189   {
190     INFOS("Caught exception in GetFittingResources C++:  " << ex.msg);
191     THROW_SALOME_CORBA_EXCEPTION(ex.msg.c_str(),SALOME::BAD_PARAM);
192   }  
193
194   return ret;
195 }
196
197 //=============================================================================
198 /*!
199  *  dynamically obtains the first machines
200  */ 
201 //=============================================================================
202
203 char *
204 SALOME_ResourcesManager::FindFirst(const Engines::ResourceList& listOfResources)
205 {
206   // CORBA -> C++
207   std::vector<std::string> rl;
208   for(unsigned int i=0; i<listOfResources.length(); i++)
209     rl.push_back(std::string(listOfResources[i]));
210
211   return CORBA::string_dup(_rm.Find("first", rl).c_str());
212 }
213
214 char *
215 SALOME_ResourcesManager::Find(const char* policy, const Engines::ResourceList& listOfResources)
216 {
217   // CORBA -> C++
218   std::vector<std::string> rl;
219   for(unsigned int i=0; i<listOfResources.length(); i++)
220     rl.push_back(std::string(listOfResources[i]));
221
222   return CORBA::string_dup(_rm.Find(policy, rl).c_str());
223 }
224
225 Engines::ResourceDefinition* 
226 SALOME_ResourcesManager::GetResourceDefinition(const char * name)
227 {
228   Engines::ResourceDefinition * p_ptr = NULL;
229   try {
230     ParserResourcesType resource = _rm.GetResourcesDescr(name);
231     p_ptr = new Engines::ResourceDefinition;
232
233     p_ptr->name = CORBA::string_dup(resource.Name.c_str());
234     p_ptr->hostname = CORBA::string_dup(resource.HostName.c_str());
235     p_ptr->type = CORBA::string_dup(resource.getResourceTypeStr().c_str());
236     p_ptr->protocol = CORBA::string_dup(resource.getAccessProtocolTypeStr().c_str());
237     p_ptr->iprotocol = CORBA::string_dup(resource.getClusterInternalProtocolStr().c_str());
238     p_ptr->username = CORBA::string_dup(resource.UserName.c_str());
239     p_ptr->applipath = CORBA::string_dup(resource.AppliPath.c_str());
240     p_ptr->componentList.length(resource.ComponentsList.size());
241     for(unsigned int i=0;i<resource.ComponentsList.size();i++)
242       p_ptr->componentList[i] = CORBA::string_dup(resource.ComponentsList[i].c_str());
243     p_ptr->OS = CORBA::string_dup(resource.OS.c_str());
244     p_ptr->mem_mb = resource.DataForSort._memInMB;
245     p_ptr->cpu_clock = resource.DataForSort._CPUFreqMHz;
246     p_ptr->nb_proc_per_node = resource.DataForSort._nbOfProcPerNode;
247     p_ptr->nb_node = resource.DataForSort._nbOfNodes;
248     p_ptr->can_launch_batch_jobs = resource.can_launch_batch_jobs;
249     p_ptr->can_run_containers = resource.can_run_containers;
250     p_ptr->working_directory = CORBA::string_dup(resource.working_directory.c_str());
251     p_ptr->mpiImpl = CORBA::string_dup(resource.getMpiImplTypeStr().c_str());
252     p_ptr->batch = CORBA::string_dup(resource.getBatchTypeStr().c_str());
253   } catch (const exception & ex) {
254     INFOS("Caught exception in GetResourceDefinition: " << ex.what());
255     THROW_SALOME_CORBA_EXCEPTION(ex.what(), SALOME::BAD_PARAM);
256   }
257
258   return p_ptr;
259 }
260
261 void 
262 SALOME_ResourcesManager::AddResource(const Engines::ResourceDefinition& new_resource,
263                                      CORBA::Boolean write,
264                                      const char * xml_file)
265 {
266   try
267   {
268     ParserResourcesType resource;
269     resource.Name = new_resource.name.in();
270     resource.HostName = new_resource.hostname.in();
271     resource.setResourceTypeStr(new_resource.type.in());
272     resource.OS = new_resource.OS.in();
273     resource.AppliPath = new_resource.applipath.in();
274     resource.DataForSort._memInMB = new_resource.mem_mb;
275     resource.DataForSort._CPUFreqMHz = new_resource.cpu_clock;
276     resource.DataForSort._nbOfNodes = new_resource.nb_node;
277     resource.DataForSort._nbOfProcPerNode = new_resource.nb_proc_per_node;
278     resource.UserName = new_resource.username.in();
279     resource.can_launch_batch_jobs = new_resource.can_launch_batch_jobs;
280     resource.can_run_containers = new_resource.can_run_containers;
281     resource.working_directory = new_resource.working_directory.in();
282     resource.setBatchTypeStr(new_resource.batch.in());
283     resource.setMpiImplTypeStr(new_resource.mpiImpl.in());
284     resource.setAccessProtocolTypeStr(new_resource.protocol.in());
285     resource.setClusterInternalProtocolStr(new_resource.iprotocol.in());
286     for (CORBA::ULong i = 0; i < new_resource.componentList.length(); i++)
287       resource.ComponentsList.push_back(new_resource.componentList[i].in());
288
289     _rm.AddResourceInCatalog(resource);
290
291     if (write)
292     {
293       _rm.WriteInXmlFile(std::string(xml_file));
294       _rm.ParseXmlFiles();
295     }
296   }
297   catch (const SALOME_Exception & e)
298   {
299     INFOS("Error in AddResourceInCatalog: " << e);
300     THROW_SALOME_CORBA_EXCEPTION(e.what(), SALOME::BAD_PARAM);
301   }
302 }
303
304 void
305 SALOME_ResourcesManager::RemoveResource(const char * resource_name,
306                                         CORBA::Boolean write,
307                                         const char * xml_file)
308 {
309   try
310   {
311     _rm.DeleteResourceInCatalog(resource_name);
312   }
313   catch (const SALOME_Exception & e)
314   {
315     INFOS("Error in DeleteResourceInCatalog: " << e);
316     THROW_SALOME_CORBA_EXCEPTION(e.what(), SALOME::BAD_PARAM);
317   }
318
319   if (write)
320   {
321     _rm.WriteInXmlFile(std::string(xml_file));
322     _rm.ParseXmlFiles();
323   }
324 }
325
326 std::string 
327 SALOME_ResourcesManager::getMachineFile(std::string resource_name, 
328                                         CORBA::Long nb_procs, 
329                                         std::string parallelLib)
330 {
331   std::string machine_file_name("");
332
333   if (parallelLib == "Dummy")
334   {
335     MESSAGE("[getMachineFile] parallelLib is Dummy");
336     MapOfParserResourcesType resourcesList = _rm.GetList();
337     if (resourcesList.find(resource_name) != resourcesList.end())
338     {
339       ParserResourcesType resource = resourcesList[resource_name];
340
341       // Check if resource is cluster or not
342       if (resource.ClusterMembersList.empty())
343       {
344         //It is not a cluster so we create a cluster with one machine
345         ParserResourcesType fake_node;
346         fake_node.HostName = resource.HostName;
347         fake_node.Protocol = resource.Protocol;
348         fake_node.ClusterInternalProtocol = resource.ClusterInternalProtocol;
349         fake_node.UserName = resource.UserName;
350         fake_node.AppliPath = resource.AppliPath;
351         fake_node.DataForSort = resource.DataForSort;
352
353         resource.ClusterMembersList.push_front(fake_node);
354       }
355
356       // Creating list of machines for creating the machine file
357       std::list<std::string> list_of_machines;
358       std::list<ParserResourcesType>::iterator cluster_it =
359         resource.ClusterMembersList.begin();
360       while (cluster_it != resource.ClusterMembersList.end())
361       {
362         // For each member of the cluster we add a nbOfNodes * nbOfProcPerNode in the list
363         unsigned int number_of_proc = (*cluster_it).DataForSort._nbOfNodes * 
364                                       (*cluster_it).DataForSort._nbOfProcPerNode;
365         for (unsigned int i = 0; i < number_of_proc; i++)
366           list_of_machines.push_back((*cluster_it).HostName);
367         cluster_it++;
368       }
369
370       // Creating machine file
371       machine_file_name = tmpnam(NULL);
372       std::ofstream machine_file(machine_file_name.c_str(), std::ios_base::out);
373
374       CORBA::Long machine_number = 0;
375       std::list<std::string>::iterator it = list_of_machines.begin();
376       while (machine_number != nb_procs)
377       {
378         // Adding a new node to the machine file
379         machine_file << *it << std::endl;
380
381         // counting...
382         it++;
383         if (it == list_of_machines.end())
384           it = list_of_machines.begin();
385         machine_number++;
386       }
387     }
388     else
389       INFOS("[getMachineFile] Error resource_name not found in resourcesList -> " << resource_name);
390   }
391   else if (parallelLib == "Mpi")
392   {
393     MESSAGE("[getMachineFile] parallelLib is Mpi");
394
395     MapOfParserResourcesType resourcesList = _rm.GetList();
396     if (resourcesList.find(resource_name) != resourcesList.end())
397     {
398       ParserResourcesType resource = resourcesList[resource_name];
399       // Check if resource is cluster or not
400       if (resource.ClusterMembersList.empty())
401       {
402         //It is not a cluster so we create a cluster with one machine
403         ParserResourcesType fake_node;
404         fake_node.HostName = resource.HostName;
405         fake_node.Protocol = resource.Protocol;
406         fake_node.ClusterInternalProtocol = resource.ClusterInternalProtocol;
407         fake_node.UserName = resource.UserName;
408         fake_node.AppliPath = resource.AppliPath;
409         fake_node.DataForSort = resource.DataForSort;
410
411         resource.ClusterMembersList.push_front(fake_node);
412       }
413
414       // Choose mpi implementation -> each MPI implementation has is own machinefile...
415       if (resource.mpi == lam)
416       {
417         // Creating machine file
418         machine_file_name = tmpnam(NULL);
419         std::ofstream machine_file(machine_file_name.c_str(), std::ios_base::out);
420
421         // We add all cluster machines to the file
422         std::list<ParserResourcesType>::iterator cluster_it =
423           resource.ClusterMembersList.begin();
424         while (cluster_it != resource.ClusterMembersList.end())
425         {
426           unsigned int number_of_proc = (*cluster_it).DataForSort._nbOfNodes * 
427             (*cluster_it).DataForSort._nbOfProcPerNode;
428           machine_file << (*cluster_it).HostName << " cpu=" << number_of_proc << std::endl;
429           cluster_it++;
430         }
431       }
432       else if ((resource.mpi == openmpi) || (resource.mpi == ompi))
433       {
434         // Creating machine file
435         machine_file_name = tmpnam(NULL);
436         std::ofstream machine_file(machine_file_name.c_str(), std::ios_base::out);
437
438         // We add all cluster machines to the file
439         std::list<ParserResourcesType>::iterator cluster_it =
440           resource.ClusterMembersList.begin();
441         while (cluster_it != resource.ClusterMembersList.end())
442         {
443           unsigned int number_of_proc = (*cluster_it).DataForSort._nbOfNodes *
444             (*cluster_it).DataForSort._nbOfProcPerNode;
445           machine_file << (*cluster_it).HostName << " slots=" << number_of_proc << std::endl;
446           cluster_it++;
447         }
448       }
449       else if (resource.mpi == nompi)
450       {
451         INFOS("[getMachineFile] Error resource_name MPI implementation was defined for " << resource_name);
452       }
453       else
454         INFOS("[getMachineFile] Error resource_name MPI implementation not currenly handled for " << resource_name);
455     }
456     else
457       INFOS("[getMachineFile] Error resource_name not found in resourcesList -> " << resource_name);
458   }
459   else
460     INFOS("[getMachineFile] Error parallelLib is not handled -> " << parallelLib);
461
462   return machine_file_name;
463 }