Salome HOME
Add job parameter 'exclusive' to let the user choose to share nodes or not (EDF issue...
[modules/jobmanager.git] / src / engine / BL_SALOMEServices.cxx
1 // Copyright (C) 2009-2013  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "BL_SALOMEServices.hxx"
21 #include <sstream>
22
23 static std::ostream & 
24 operator<<(std::ostream & os, const CORBA::Exception & e)
25 {
26   CORBA::Any tmp;
27   tmp <<=e ;
28   CORBA::TypeCode_var tc = tmp.type();
29   const char * p = tc->name ();
30   if (*p != '\0')
31     os << p;
32   else
33     os << tc->id();
34   return os;
35 }
36
37 BL::SALOMEServices::SALOMEServices()
38 {
39   int nbargs = 0; char **args = 0;
40   _orb = CORBA::ORB_init (nbargs, args);
41   _salome_naming_service = NULL;
42   _lcc = NULL;
43   _state = false;
44   _manager = NULL;
45 }
46
47 BL::SALOMEServices::~SALOMEServices() {}
48
49 void
50 BL::SALOMEServices::end()
51 {
52   if (!CORBA::is_nil(_salome_launcher))
53     _salome_launcher->removeObserver(_this());
54   if (_salome_naming_service)
55     delete _salome_naming_service;
56   if (_lcc)
57     delete _lcc;
58 }
59
60 bool
61 BL::SALOMEServices::initNS()
62 {
63   bool return_value = true;
64   _salome_naming_service = new SALOME_NamingService(_orb);
65   _lcc = new SALOME_LifeCycleCORBA(_salome_naming_service);
66   CORBA::Object_var obj = _salome_naming_service->Resolve("/SalomeLauncher");
67   _salome_launcher = Engines::SalomeLauncher::_narrow(obj);
68
69   if (CORBA::is_nil(_salome_launcher))
70   {
71     DEBMSG("SALOME Launcher is not reachable!")
72     return_value = false;
73   }
74   _salome_launcher->addObserver(_this());
75   _remove_ref(); // POA will automatically destroy the object
76
77   obj = _salome_naming_service->Resolve("/ResourcesManager");
78   _resources_manager = Engines::ResourcesManager::_narrow(obj);
79   if (CORBA::is_nil(_resources_manager))
80   {
81     DEBMSG("SALOME Resource Manager is not reachable !");
82     return_value = false;
83   }
84
85   _state = return_value;
86   return return_value;
87 }
88
89 std::list<std::string> 
90 BL::SALOMEServices::getResourceList(bool batch_only)
91 {
92   std::list<std::string> resource_list;
93   
94   if (_state)
95   {
96     Engines::ResourceParameters params;
97     _lcc->preSet(params);
98     params.can_launch_batch_jobs = batch_only;
99     Engines::ResourceList * resourceList = NULL;
100     try
101     {
102       resourceList = _resources_manager->GetFittingResources(params);
103     }
104     catch (const SALOME::SALOME_Exception & ex)
105     {
106       DEBMSG("SALOME Exception in addResource ! " << ex.details.text.in());
107     }
108     catch (const CORBA::SystemException& ex)
109     {
110       DEBMSG("Receive SALOME System Exception: " << ex);
111       DEBMSG("Check SALOME servers...");
112     }
113     if (resourceList)
114     {
115       for (int i = 0; i < resourceList->length(); i++)
116       {
117         const char* aResource = (*resourceList)[i];
118         resource_list.push_back(aResource);
119       }
120       delete resourceList;
121     }
122   }
123   return resource_list;
124 }
125
126 BL::ResourceDescr
127 BL::SALOMEServices::getResourceDescr(const std::string& name)
128 {
129   Engines::ResourceDefinition * resource_definition = NULL;
130   BL::ResourceDescr resource_descr;
131
132   try 
133   {
134     resource_definition = _resources_manager-> GetResourceDefinition(name.c_str());
135   }
136   catch (const SALOME::SALOME_Exception & ex)
137   {
138     DEBMSG("SALOME Exception in addResource ! " << ex.details.text.in());
139   }
140   catch (const CORBA::SystemException& ex)
141   {
142     DEBMSG("Receive SALOME System Exception: " << ex);
143     DEBMSG("Check SALOME servers...");
144   }
145
146   if(resource_definition)
147   {
148     resource_descr.name = resource_definition->name.in();
149     resource_descr.hostname = resource_definition->hostname.in();
150     resource_descr.protocol = resource_definition->protocol.in();
151     resource_descr.username = resource_definition->username.in();
152     resource_descr.applipath = resource_definition->applipath.in();
153     for (int i = 0; i < resource_definition->componentList.length(); i++)
154     {
155       resource_descr.componentList.push_back(resource_definition->componentList[i].in());
156     }
157
158     resource_descr.OS = resource_definition->OS.in();
159     resource_descr.mem_mb = resource_definition->mem_mb;
160     resource_descr.cpu_clock = resource_definition->cpu_clock;
161     resource_descr.nb_node = resource_definition->nb_node;
162     resource_descr.nb_proc_per_node = resource_definition->nb_proc_per_node;
163     resource_descr.batch = resource_definition->batch.in();
164     resource_descr.mpiImpl = resource_definition->mpiImpl.in();
165     resource_descr.iprotocol = resource_definition->iprotocol.in();
166     resource_descr.can_launch_batch_jobs = resource_definition->can_launch_batch_jobs;
167     resource_descr.can_run_containers = resource_definition->can_run_containers;
168     resource_descr.working_directory = resource_definition->working_directory.in();
169
170     delete resource_definition;
171   }
172   return resource_descr;
173 }
174
175 void
176 BL::SALOMEServices::addResource(BL::ResourceDescr & new_resource)
177 {
178   Engines::ResourceDefinition_var resource_definition = new Engines::ResourceDefinition;
179
180   resource_definition->name = CORBA::string_dup(new_resource.name.c_str());
181   resource_definition->hostname = CORBA::string_dup(new_resource.hostname.c_str());
182   if (new_resource.batch == "none")
183     resource_definition->type = CORBA::string_dup("single_machine");
184   else
185     resource_definition->type = CORBA::string_dup("cluster");
186   resource_definition->protocol = CORBA::string_dup(new_resource.protocol.c_str());
187   resource_definition->username = CORBA::string_dup(new_resource.username.c_str());
188   resource_definition->applipath = CORBA::string_dup(new_resource.applipath.c_str());
189
190   int i = 0;
191   std::list<std::string>::iterator it = new_resource.componentList.begin();
192   resource_definition->componentList.length(new_resource.componentList.size());
193   for(; it != new_resource.componentList.end(); it++)
194   {
195     resource_definition->componentList[i] = CORBA::string_dup((*it).c_str());
196     i++;
197   }
198
199   resource_definition->OS = CORBA::string_dup(new_resource.OS.c_str());
200   resource_definition->mem_mb = new_resource.mem_mb;
201   resource_definition->cpu_clock = new_resource.cpu_clock;
202   resource_definition->nb_node = new_resource.nb_node;
203   resource_definition->nb_proc_per_node = new_resource.nb_proc_per_node;  
204   resource_definition->batch = CORBA::string_dup(new_resource.batch.c_str());
205   resource_definition->mpiImpl = CORBA::string_dup(new_resource.mpiImpl.c_str());
206   resource_definition->iprotocol = CORBA::string_dup(new_resource.iprotocol.c_str());
207   resource_definition->can_launch_batch_jobs = new_resource.can_launch_batch_jobs;
208   resource_definition->can_run_containers = new_resource.can_run_containers;
209   resource_definition->working_directory = CORBA::string_dup(new_resource.working_directory.c_str());
210
211   try
212   {
213     _resources_manager->AddResource(resource_definition, true, "");
214   }
215   catch (const SALOME::SALOME_Exception & ex)
216   {
217     DEBMSG("SALOME Exception in addResource ! " << ex.details.text.in());
218     throw(BL::Exception(ex.details.text.in()));
219   }
220   catch (const CORBA::SystemException& ex)
221   {
222     DEBMSG("Receive SALOME System Exception: " << ex);
223     DEBMSG("Check SALOME servers...");
224     throw(BL::Exception("SALOME System Exception"));
225   }
226 }
227
228 void 
229 BL::SALOMEServices::removeResource(const std::string & name)
230 {
231   try
232   {
233     _resources_manager->RemoveResource(name.c_str(), true, "");
234   }
235   catch (const SALOME::SALOME_Exception & ex)
236   {
237     DEBMSG("SALOME Exception in removeResource ! " << ex.details.text.in());
238   }
239   catch (const CORBA::SystemException& ex)
240   {
241     DEBMSG("Receive SALOME System Exception: " << ex);
242     DEBMSG("Check SALOME servers...");
243   }
244 }
245
246 std::string
247 BL::SALOMEServices::create_job(BL::Job * job)
248 {
249   DEBMSG("Begin of create_job");
250   std::string ret = "";
251   Engines::JobParameters_var job_parameters = new Engines::JobParameters;
252
253   // Job type
254   if (job->getType() == BL::Job::COMMAND)
255   {
256     job_parameters->job_type = CORBA::string_dup("command");
257   }
258   else if (job->getType() == BL::Job::YACS_SCHEMA)
259   {
260     job_parameters->job_type = CORBA::string_dup("yacs_file");
261   }
262   else if (job->getType() == BL::Job::PYTHON_SALOME)
263   {
264     job_parameters->job_type = CORBA::string_dup("python_salome");
265   }
266
267   // Specific parameters
268   if (job->getType() == BL::Job::YACS_SCHEMA)
269   {
270     if (job->getDumpYACSState() > 0)
271     {
272       job_parameters->specific_parameters.length(job_parameters->specific_parameters.length() + 1);
273       std::ostringstream oss;
274       oss << job->getDumpYACSState();
275       Engines::Parameter_var new_parameter = new Engines::Parameter;
276       new_parameter->name = CORBA::string_dup("EnableDumpYACS");
277       new_parameter->value = CORBA::string_dup(oss.str().c_str());
278       job_parameters->specific_parameters[job_parameters->specific_parameters.length() - 1] = new_parameter;
279     }
280   }
281   if (job->getLoadLevelerJobType() != "")
282   {
283     job_parameters->specific_parameters.length(job_parameters->specific_parameters.length() + 1);
284     Engines::Parameter_var new_parameter = new Engines::Parameter;
285     new_parameter->name = CORBA::string_dup("LoalLevelerJobType");
286     new_parameter->value = CORBA::string_dup(job->getLoadLevelerJobType().c_str());
287     job_parameters->specific_parameters[job_parameters->specific_parameters.length() - 1] = new_parameter;
288   }
289
290   // Files
291   job_parameters->job_name = CORBA::string_dup(job->getName().c_str());
292   job_parameters->job_file = CORBA::string_dup(job->getJobFile().c_str());
293   job_parameters->env_file = CORBA::string_dup(job->getEnvFile().c_str());
294   BL::Job::FilesParam files = job->getFilesParameters();
295   std::list<std::string>::iterator it;
296   int i = 0;
297   int j = 0;
298
299   job_parameters->in_files.length(files.input_files_list.size());
300   for (it = files.input_files_list.begin() ; it != files.input_files_list.end(); it++)
301   {
302     job_parameters->in_files[i] = CORBA::string_dup((*it).c_str());
303     i++;
304   }
305
306   job_parameters->out_files.length(files.output_files_list.size());
307   for (it = files.output_files_list.begin() ; it != files.output_files_list.end(); it++)
308   {
309     job_parameters->out_files[j] = CORBA::string_dup((*it).c_str());
310     j++;
311   }
312   job_parameters->local_directory = CORBA::string_dup("");
313   job_parameters->result_directory = CORBA::string_dup(files.result_directory.c_str());
314
315   BL::Job::BatchParam cpp_batch_params =  job->getBatchParameters();
316   job_parameters->work_directory = CORBA::string_dup(cpp_batch_params.batch_directory.c_str());
317
318   // Resource
319   job_parameters->maximum_duration = CORBA::string_dup(cpp_batch_params.maximum_duration.c_str());
320   job_parameters->resource_required.name = CORBA::string_dup(job->getResource().c_str());
321   job_parameters->resource_required.nb_proc = cpp_batch_params.nb_proc;
322   job_parameters->queue = CORBA::string_dup(job->getBatchQueue().c_str());
323   job_parameters->exclusive = cpp_batch_params.exclusive;
324
325   // Memory
326   CORBA::Long memory;
327   std::string ram = cpp_batch_params.expected_memory.substr(0,cpp_batch_params.expected_memory.size()-2);
328   std::istringstream iss(ram);
329   iss >> memory;
330   std::string unity = cpp_batch_params.expected_memory.substr(cpp_batch_params.expected_memory.size()-2, 2);
331   if((unity.find("gb") != std::string::npos))
332     memory = memory * 1024;
333   job_parameters->resource_required.mem_mb = memory;
334
335   // Parameters for COORM
336   job_parameters->launcher_file = CORBA::string_dup(cpp_batch_params.launcher_file.c_str());
337   job_parameters->launcher_args = CORBA::string_dup(cpp_batch_params.launcher_args.c_str());
338
339   // Create Job
340   try
341   {
342     int job_id = _salome_launcher->createJob(job_parameters);
343     job->setSalomeLauncherId(job_id);
344   }
345   catch (const SALOME::SALOME_Exception & ex)
346   {
347     DEBMSG("SALOME Exception in createJob !");
348     ret = ex.details.text.in();
349   }
350   catch (const CORBA::SystemException& ex)
351   {
352     DEBMSG("Receive SALOME System Exception: " << ex);
353     DEBMSG("Check SALOME servers...");
354     ret = "SALOME System Exception - see logs";
355   }
356   return ret;
357 }
358
359 std::string
360 BL::SALOMEServices::start_job(BL::Job * job)
361 {
362   std::string ret = "";
363   // Launch Job !
364   try
365   {
366     _salome_launcher->launchJob(job->getSalomeLauncherId());
367   }
368   catch (const SALOME::SALOME_Exception & ex)
369   {
370     DEBMSG("SALOME Exception in launchJob !");
371     ret = ex.details.text.in();
372   }
373   catch (const CORBA::SystemException& ex)
374   {
375     DEBMSG("Receive SALOME System Exception: " << ex);
376     DEBMSG("Check SALOME servers...");
377     ret = "SALOME System Exception - see logs";
378   }
379   return ret;
380 }
381
382 std::string
383 BL::SALOMEServices::refresh_job(BL::Job * job)
384 {
385   std::string ret = "";
386
387   // Refresh Job !
388   try
389   {
390     CORBA::String_var result = _salome_launcher->getJobState(job->getSalomeLauncherId());
391     ret = result.in();
392   }
393   catch (const SALOME::SALOME_Exception & ex)
394   {
395     DEBMSG("SALOME Exception in getJobState !");
396     ret = ex.details.text.in();
397   }
398   catch (const CORBA::SystemException& ex)
399   {
400     DEBMSG("Receive SALOME System Exception: " << ex);
401     DEBMSG("Check SALOME servers...");
402     ret = "SALOME System Exception - see logs";
403   }
404   return ret;
405 }
406
407 std::string
408 BL::SALOMEServices::delete_job(BL::Job * job)
409 {
410   std::string ret = "";
411   // Delete Job !
412   try
413   {
414     _salome_launcher->removeJob(job->getSalomeLauncherId());
415   }
416   catch (const SALOME::SALOME_Exception & ex)
417   {
418     DEBMSG("SALOME Exception in removeJob !");
419     ret = ex.details.text.in();
420   }
421   catch (const CORBA::SystemException& ex)
422   {
423     DEBMSG("Receive SALOME System Exception: " << ex);
424     DEBMSG("Check SALOME servers...");
425     ret = "SALOME System Exception - see logs";
426   }
427   return ret;
428 }
429
430 std::string
431 BL::SALOMEServices::stop_job(BL::Job * job)
432 {
433   std::string ret = "";
434   try
435   {
436     _salome_launcher->stopJob(job->getSalomeLauncherId());
437   }
438   catch (const SALOME::SALOME_Exception & ex)
439   {
440     DEBMSG("SALOME Exception in stopJob !");
441     ret = ex.details.text.in();
442   }
443   catch (const CORBA::SystemException& ex)
444   {
445     DEBMSG("Receive SALOME System Exception: " << ex);
446     DEBMSG("Check SALOME servers...");
447     ret = "SALOME System Exception - see logs";
448   }
449   return ret;
450 }
451
452 std::string
453 BL::SALOMEServices::get_results_job(BL::Job * job)
454 {
455   std::string ret = "";
456
457   BL::Job::FilesParam files = job->getFilesParameters();
458   CORBA::String_var directory = CORBA::string_dup(files.result_directory.c_str());
459
460   // get job results !
461   try
462   {
463     _salome_launcher->getJobResults(job->getSalomeLauncherId(), directory);
464   }
465   catch (const SALOME::SALOME_Exception & ex)
466   {
467     DEBMSG("SALOME Exception in refresh_job !");
468     ret = ex.details.text.in();
469   }
470   catch (const CORBA::SystemException& ex)
471   {
472     DEBMSG("Receive SALOME System Exception: " << ex);
473     DEBMSG("Check SALOME servers...");
474     ret = "SALOME System Exception - see logs";
475   }
476   return ret;
477 }
478
479 // Get names or ids of hosts assigned to the job
480 std::string
481 BL::SALOMEServices::get_assigned_hostnames(BL::Job * job)
482 {
483   std::string ret = "";
484
485   try
486   {
487     CORBA::String_var result = _salome_launcher->getAssignedHostnames(job->getSalomeLauncherId());
488     ret = result.in();
489   }
490   catch (const SALOME::SALOME_Exception & ex)
491   {
492     DEBMSG("SALOME Exception in get_assigned_hostnames !");
493     ret = ex.details.text.in();
494   }
495   catch (const CORBA::SystemException& ex)
496   {
497     DEBMSG("Receive SALOME System Exception: " << ex);
498     DEBMSG("Check SALOME servers...");
499     ret = "SALOME System Exception - see logs";
500   }
501   return ret;
502 }
503
504 std::string
505 BL::SALOMEServices::save_jobs(const std::string & file_name)
506 {
507   CORBA::String_var file = CORBA::string_dup(file_name.c_str());
508   std::string ret = "";
509   try
510   {
511     _salome_launcher->saveJobs(file);
512   }
513   catch (const SALOME::SALOME_Exception & ex)
514   {
515     DEBMSG("SALOME Exception in saveJobs !");
516     ret = ex.details.text.in();
517   }
518   catch (const CORBA::SystemException& ex)
519   {
520     DEBMSG("Receive CORBA System Exception: " << ex);
521     DEBMSG("Check SALOME servers...");
522     ret = "CORBA System Exception - see SALOME logs";
523   }
524   return ret;
525 }
526
527 std::string
528 BL::SALOMEServices::load_jobs(const std::string & file_name)
529 {
530   CORBA::String_var file = CORBA::string_dup(file_name.c_str());
531   std::string ret = "";
532   try
533   {
534     _salome_launcher->loadJobs(file);
535   }
536   catch (const SALOME::SALOME_Exception & ex)
537   {
538     DEBMSG("SALOME Exception in loadJobs !");
539     ret = ex.details.text.in();
540   }
541   catch (const CORBA::SystemException& ex)
542   {
543     DEBMSG("Receive CORBA System Exception: " << ex);
544     DEBMSG("Check SALOME servers...");
545     ret = "CORBA System Exception - see SALOME logs";
546   }
547   return ret;
548 }
549
550 void
551 BL::SALOMEServices::notify(const char* event_name, const char * event_data)
552 {
553   DEBMSG("Launcher event received " << event_name << " " << event_data);
554
555   std::string event(event_name);
556   std::string data(event_data);
557
558   if (event == "SAVE_JOBS")
559   {
560     _manager->launcher_event_save_jobs(data);
561   }
562   else if (event == "LOAD_JOBS")
563   {
564     _manager->launcher_event_load_jobs(data);
565   }
566   else if (event == "NEW_JOB")
567   {
568     _manager->launcher_event_new_job(data);
569   }
570   else if (event == "REMOVE_JOB")
571   {
572     _manager->launcher_event_remove_job(data);
573   }
574   else if (event == "UPDATE_JOB_STATE")
575   {
576     _manager->launcher_event_update_job_state(data);
577   }
578   else
579   {
580     DEBMSG("Unkown launcher event received");
581   }
582 }
583
584 BL::Job * 
585 BL::SALOMEServices::get_new_job(int job_number)
586 {
587   DEBMSG("Start of BL::SALOMEServices::get_new_job");
588   BL::Job * job_return = NULL;
589   Engines::JobParameters * job_parameters = NULL;
590   try
591   {
592     job_parameters = _salome_launcher->getJobParameters(job_number);
593   }
594   catch (const SALOME::SALOME_Exception & ex)
595   {
596     DEBMSG("SALOME Exception in saveJobs !");
597   }
598   catch (const CORBA::SystemException& ex)
599   {
600     DEBMSG("Receive CORBA System Exception: " << ex);
601     DEBMSG("Check SALOME servers...");
602   }
603
604   if (job_parameters)
605   {
606     job_return = new BL::Job();
607     job_return->setSalomeLauncherId(job_number);
608
609     job_return->setName(job_parameters->job_name.in());
610     job_return->setType(job_parameters->job_type.in());
611     job_return->setJobFile(job_parameters->job_file.in());
612     job_return->setEnvFile(job_parameters->env_file.in());
613     job_return->setBatchQueue(job_parameters->queue.in());
614
615     BL::Job::FilesParam param;
616     param.result_directory = job_parameters->result_directory.in();
617     for (CORBA::ULong i = 0; i < job_parameters->in_files.length(); i++)
618       param.input_files_list.push_back(job_parameters->in_files[i].in());
619     for (CORBA::ULong i = 0; i < job_parameters->out_files.length(); i++)
620       param.output_files_list.push_back(job_parameters->out_files[i].in());
621     job_return->setFilesParameters(param);
622
623     BL::Job::BatchParam batch_param;
624     batch_param.batch_directory = job_parameters->work_directory.in();
625     batch_param.maximum_duration = job_parameters->maximum_duration.in();
626     batch_param.nb_proc = job_parameters->resource_required.nb_proc;
627     batch_param.exclusive = job_parameters->exclusive;
628     std::ostringstream mem_stream;
629     mem_stream << job_parameters->resource_required.mem_mb << "mb";
630     batch_param.expected_memory = mem_stream.str();
631
632         // Parameters for COORM
633     batch_param.launcher_file = job_parameters->launcher_file.in();
634     batch_param.launcher_args = job_parameters->launcher_args.in();
635
636     job_return->setBatchParameters(batch_param);
637
638     job_return->setResource(job_parameters->resource_required.name.in());
639
640     // Specific parameters
641     for (CORBA::ULong i = 0; i < job_parameters->specific_parameters.length(); i++)
642     {
643       if (std::string(job_parameters->specific_parameters[i].name.in()) == "EnableDumpYACS")
644       {
645         std::string user_value = job_parameters->specific_parameters[i].value.in();
646         std::istringstream iss(user_value);
647         int value;
648         iss >> value;
649         job_return->setDumpYACSState(value);
650       }
651       if (std::string(job_parameters->specific_parameters[i].name.in()) == "LoalLevelerJobType")
652       {
653         std::string user_value = job_parameters->specific_parameters[i].value.in();
654         job_return->setLoadLevelerJobType(user_value);
655       }
656     }
657
658     // Get current state
659     std::string result_job = job_return->setStringState(refresh_job(job_return));
660     if (result_job != "RefreshError") {}
661     else
662     {
663       // Error in getting state
664       DEBMSG("Error in getting state of the new job!");
665       delete job_return;
666       job_return = NULL;
667     }
668     delete job_parameters;
669   }
670
671   return job_return;
672 }