]> SALOME platform Git repositories - modules/jobmanager.git/blob - src/engine/BL_SALOMEServices.cxx
Salome HOME
Remove distinction between batch managers ssh_batch and none
[modules/jobmanager.git] / src / engine / BL_SALOMEServices.cxx
1 // Copyright (C) 2009-2013  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "BL_SALOMEServices.hxx"
21 #include <sstream>
22
23 static std::ostream & 
24 operator<<(std::ostream & os, const CORBA::Exception & e)
25 {
26   CORBA::Any tmp;
27   tmp <<=e ;
28   CORBA::TypeCode_var tc = tmp.type();
29   const char * p = tc->name ();
30   if (*p != '\0')
31     os << p;
32   else
33     os << tc->id();
34   return os;
35 }
36
37 BL::SALOMEServices::SALOMEServices()
38 {
39   int nbargs = 0; char **args = 0;
40   _orb = CORBA::ORB_init (nbargs, args);
41   _salome_naming_service = NULL;
42   _lcc = NULL;
43   _state = false;
44   _manager = NULL;
45 }
46
47 BL::SALOMEServices::~SALOMEServices() {}
48
49 void
50 BL::SALOMEServices::end()
51 {
52   if (!CORBA::is_nil(_salome_launcher))
53     _salome_launcher->removeObserver(_this());
54   if (_salome_naming_service)
55     delete _salome_naming_service;
56   if (_lcc)
57     delete _lcc;
58 }
59
60 bool
61 BL::SALOMEServices::initNS()
62 {
63   bool return_value = true;
64   _salome_naming_service = new SALOME_NamingService(_orb);
65   _lcc = new SALOME_LifeCycleCORBA(_salome_naming_service);
66   CORBA::Object_var obj = _salome_naming_service->Resolve("/SalomeLauncher");
67   _salome_launcher = Engines::SalomeLauncher::_narrow(obj);
68
69   if (CORBA::is_nil(_salome_launcher))
70   {
71     DEBMSG("SALOME Launcher is not reachable!")
72     return_value = false;
73   }
74   _salome_launcher->addObserver(_this());
75   _remove_ref(); // POA will automatically destroy the object
76
77   obj = _salome_naming_service->Resolve("/ResourcesManager");
78   _resources_manager = Engines::ResourcesManager::_narrow(obj);
79   if (CORBA::is_nil(_resources_manager))
80   {
81     DEBMSG("SALOME Resource Manager is not reachable !");
82     return_value = false;
83   }
84
85   _state = return_value;
86   return return_value;
87 }
88
89 std::list<std::string> 
90 BL::SALOMEServices::getResourceList(bool batch_only)
91 {
92   std::list<std::string> resource_list;
93   
94   if (_state)
95   {
96     Engines::ResourceParameters params;
97     _lcc->preSet(params);
98     params.can_launch_batch_jobs = batch_only;
99     Engines::ResourceList * resourceList = NULL;
100     try
101     {
102       resourceList = _resources_manager->GetFittingResources(params);
103     }
104     catch (const SALOME::SALOME_Exception & ex)
105     {
106       DEBMSG("SALOME Exception in addResource ! " << ex.details.text.in());
107     }
108     catch (const CORBA::SystemException& ex)
109     {
110       DEBMSG("Receive SALOME System Exception: " << ex);
111       DEBMSG("Check SALOME servers...");
112     }
113     if (resourceList)
114     {
115       for (int i = 0; i < resourceList->length(); i++)
116       {
117         const char* aResource = (*resourceList)[i];
118         resource_list.push_back(aResource);
119       }
120       delete resourceList;
121     }
122   }
123   return resource_list;
124 }
125
126 BL::ResourceDescr
127 BL::SALOMEServices::getResourceDescr(const std::string& name)
128 {
129   Engines::ResourceDefinition * resource_definition = NULL;
130   BL::ResourceDescr resource_descr;
131
132   try 
133   {
134     resource_definition = _resources_manager-> GetResourceDefinition(name.c_str());
135   }
136   catch (const SALOME::SALOME_Exception & ex)
137   {
138     DEBMSG("SALOME Exception in addResource ! " << ex.details.text.in());
139   }
140   catch (const CORBA::SystemException& ex)
141   {
142     DEBMSG("Receive SALOME System Exception: " << ex);
143     DEBMSG("Check SALOME servers...");
144   }
145
146   if(resource_definition)
147   {
148     resource_descr.name = resource_definition->name.in();
149     resource_descr.hostname = resource_definition->hostname.in();
150     resource_descr.protocol = resource_definition->protocol.in();
151     resource_descr.username = resource_definition->username.in();
152     resource_descr.applipath = resource_definition->applipath.in();
153     for (int i = 0; i < resource_definition->componentList.length(); i++)
154     {
155       resource_descr.componentList.push_back(resource_definition->componentList[i].in());
156     }
157
158     resource_descr.OS = resource_definition->OS.in();
159     resource_descr.mem_mb = resource_definition->mem_mb;
160     resource_descr.cpu_clock = resource_definition->cpu_clock;
161     resource_descr.nb_node = resource_definition->nb_node;
162     resource_descr.nb_proc_per_node = resource_definition->nb_proc_per_node;
163     resource_descr.batch = resource_definition->batch.in();
164     resource_descr.mpiImpl = resource_definition->mpiImpl.in();
165     resource_descr.iprotocol = resource_definition->iprotocol.in();
166     resource_descr.can_launch_batch_jobs = resource_definition->can_launch_batch_jobs;
167     resource_descr.can_run_containers = resource_definition->can_run_containers;
168     resource_descr.working_directory = resource_definition->working_directory.in();
169
170     delete resource_definition;
171   }
172   return resource_descr;
173 }
174
175 void
176 BL::SALOMEServices::addResource(BL::ResourceDescr & new_resource)
177 {
178   Engines::ResourceDefinition_var resource_definition = new Engines::ResourceDefinition;
179
180   resource_definition->name = CORBA::string_dup(new_resource.name.c_str());
181   resource_definition->hostname = CORBA::string_dup(new_resource.hostname.c_str());
182   if (new_resource.batch == "none")
183     resource_definition->type = CORBA::string_dup("single_machine");
184   else
185     resource_definition->type = CORBA::string_dup("cluster");
186   resource_definition->protocol = CORBA::string_dup(new_resource.protocol.c_str());
187   resource_definition->username = CORBA::string_dup(new_resource.username.c_str());
188   resource_definition->applipath = CORBA::string_dup(new_resource.applipath.c_str());
189
190   int i = 0;
191   std::list<std::string>::iterator it = new_resource.componentList.begin();
192   resource_definition->componentList.length(new_resource.componentList.size());
193   for(; it != new_resource.componentList.end(); it++)
194   {
195     resource_definition->componentList[i] = CORBA::string_dup((*it).c_str());
196     i++;
197   }
198
199   resource_definition->OS = CORBA::string_dup(new_resource.OS.c_str());
200   resource_definition->mem_mb = new_resource.mem_mb;
201   resource_definition->cpu_clock = new_resource.cpu_clock;
202   resource_definition->nb_node = new_resource.nb_node;
203   resource_definition->nb_proc_per_node = new_resource.nb_proc_per_node;  
204   resource_definition->batch = CORBA::string_dup(new_resource.batch.c_str());
205   resource_definition->mpiImpl = CORBA::string_dup(new_resource.mpiImpl.c_str());
206   resource_definition->iprotocol = CORBA::string_dup(new_resource.iprotocol.c_str());
207   resource_definition->can_launch_batch_jobs = new_resource.can_launch_batch_jobs;
208   resource_definition->can_run_containers = new_resource.can_run_containers;
209   resource_definition->working_directory = CORBA::string_dup(new_resource.working_directory.c_str());
210
211   try
212   {
213     _resources_manager->AddResource(resource_definition, true, "");
214   }
215   catch (const SALOME::SALOME_Exception & ex)
216   {
217     DEBMSG("SALOME Exception in addResource ! " << ex.details.text.in());
218     throw(BL::Exception(ex.details.text.in()));
219   }
220   catch (const CORBA::SystemException& ex)
221   {
222     DEBMSG("Receive SALOME System Exception: " << ex);
223     DEBMSG("Check SALOME servers...");
224     throw(BL::Exception("SALOME System Exception"));
225   }
226 }
227
228 void 
229 BL::SALOMEServices::removeResource(const std::string & name)
230 {
231   try
232   {
233     _resources_manager->RemoveResource(name.c_str(), true, "");
234   }
235   catch (const SALOME::SALOME_Exception & ex)
236   {
237     DEBMSG("SALOME Exception in removeResource ! " << ex.details.text.in());
238   }
239   catch (const CORBA::SystemException& ex)
240   {
241     DEBMSG("Receive SALOME System Exception: " << ex);
242     DEBMSG("Check SALOME servers...");
243   }
244 }
245
246 std::string
247 BL::SALOMEServices::create_job(BL::Job * job)
248 {
249   DEBMSG("Begin of create_job");
250   std::string ret = "";
251   Engines::JobParameters_var job_parameters = new Engines::JobParameters;
252
253   // Job type
254   if (job->getType() == BL::Job::COMMAND)
255   {
256     job_parameters->job_type = CORBA::string_dup("command");
257   }
258   else if (job->getType() == BL::Job::YACS_SCHEMA)
259   {
260     job_parameters->job_type = CORBA::string_dup("yacs_file");
261   }
262   else if (job->getType() == BL::Job::PYTHON_SALOME)
263   {
264     job_parameters->job_type = CORBA::string_dup("python_salome");
265   }
266
267   // Specific parameters
268   if (job->getType() == BL::Job::YACS_SCHEMA)
269   {
270     if (job->getDumpYACSState() > 0)
271     {
272       job_parameters->specific_parameters.length(job_parameters->specific_parameters.length() + 1);
273       std::ostringstream oss;
274       oss << job->getDumpYACSState();
275       Engines::Parameter_var new_parameter = new Engines::Parameter;
276       new_parameter->name = CORBA::string_dup("EnableDumpYACS");
277       new_parameter->value = CORBA::string_dup(oss.str().c_str());
278       job_parameters->specific_parameters[job_parameters->specific_parameters.length() - 1] = new_parameter;
279     }
280   }
281   if (job->getLoadLevelerJobType() != "")
282   {
283     job_parameters->specific_parameters.length(job_parameters->specific_parameters.length() + 1);
284     Engines::Parameter_var new_parameter = new Engines::Parameter;
285     new_parameter->name = CORBA::string_dup("LoalLevelerJobType");
286     new_parameter->value = CORBA::string_dup(job->getLoadLevelerJobType().c_str());
287     job_parameters->specific_parameters[job_parameters->specific_parameters.length() - 1] = new_parameter;
288   }
289
290   // Files
291   job_parameters->job_name = CORBA::string_dup(job->getName().c_str());
292   job_parameters->job_file = CORBA::string_dup(job->getJobFile().c_str());
293   job_parameters->env_file = CORBA::string_dup(job->getEnvFile().c_str());
294   BL::Job::FilesParam files = job->getFilesParameters();
295   std::list<std::string>::iterator it;
296   int i = 0;
297   int j = 0;
298
299   job_parameters->in_files.length(files.input_files_list.size());
300   for (it = files.input_files_list.begin() ; it != files.input_files_list.end(); it++)
301   {
302     job_parameters->in_files[i] = CORBA::string_dup((*it).c_str());
303     i++;
304   }
305
306   job_parameters->out_files.length(files.output_files_list.size());
307   for (it = files.output_files_list.begin() ; it != files.output_files_list.end(); it++)
308   {
309     job_parameters->out_files[j] = CORBA::string_dup((*it).c_str());
310     j++;
311   }
312   job_parameters->local_directory = CORBA::string_dup("");
313   job_parameters->result_directory = CORBA::string_dup(files.result_directory.c_str());
314
315   BL::Job::BatchParam cpp_batch_params =  job->getBatchParameters();
316   job_parameters->work_directory = CORBA::string_dup(cpp_batch_params.batch_directory.c_str());
317
318   // Resource
319   job_parameters->maximum_duration = CORBA::string_dup(cpp_batch_params.maximum_duration.c_str());
320   job_parameters->resource_required.name = CORBA::string_dup(job->getResource().c_str());
321   job_parameters->resource_required.nb_proc = cpp_batch_params.nb_proc;
322   job_parameters->queue = CORBA::string_dup(job->getBatchQueue().c_str());
323
324   // Memory
325   CORBA::Long memory;
326   std::string ram = cpp_batch_params.expected_memory.substr(0,cpp_batch_params.expected_memory.size()-2);
327   std::istringstream iss(ram);
328   iss >> memory;
329   std::string unity = cpp_batch_params.expected_memory.substr(cpp_batch_params.expected_memory.size()-2, 2);
330   if((unity.find("gb") != std::string::npos))
331     memory = memory * 1024;
332   job_parameters->resource_required.mem_mb = memory;
333
334   // Parameters for COORM
335   job_parameters->launcher_file = CORBA::string_dup(cpp_batch_params.launcher_file.c_str());
336   job_parameters->launcher_args = CORBA::string_dup(cpp_batch_params.launcher_args.c_str());
337
338   // Create Job
339   try
340   {
341     int job_id = _salome_launcher->createJob(job_parameters);
342     job->setSalomeLauncherId(job_id);
343   }
344   catch (const SALOME::SALOME_Exception & ex)
345   {
346     DEBMSG("SALOME Exception in createJob !");
347     ret = ex.details.text.in();
348   }
349   catch (const CORBA::SystemException& ex)
350   {
351     DEBMSG("Receive SALOME System Exception: " << ex);
352     DEBMSG("Check SALOME servers...");
353     ret = "SALOME System Exception - see logs";
354   }
355   return ret;
356 }
357
358 std::string
359 BL::SALOMEServices::start_job(BL::Job * job)
360 {
361   std::string ret = "";
362   // Launch Job !
363   try
364   {
365     _salome_launcher->launchJob(job->getSalomeLauncherId());
366   }
367   catch (const SALOME::SALOME_Exception & ex)
368   {
369     DEBMSG("SALOME Exception in launchJob !");
370     ret = ex.details.text.in();
371   }
372   catch (const CORBA::SystemException& ex)
373   {
374     DEBMSG("Receive SALOME System Exception: " << ex);
375     DEBMSG("Check SALOME servers...");
376     ret = "SALOME System Exception - see logs";
377   }
378   return ret;
379 }
380
381 std::string
382 BL::SALOMEServices::refresh_job(BL::Job * job)
383 {
384   std::string ret = "";
385
386   // Refresh Job !
387   try
388   {
389     CORBA::String_var result = _salome_launcher->getJobState(job->getSalomeLauncherId());
390     ret = result.in();
391   }
392   catch (const SALOME::SALOME_Exception & ex)
393   {
394     DEBMSG("SALOME Exception in getJobState !");
395     ret = ex.details.text.in();
396   }
397   catch (const CORBA::SystemException& ex)
398   {
399     DEBMSG("Receive SALOME System Exception: " << ex);
400     DEBMSG("Check SALOME servers...");
401     ret = "SALOME System Exception - see logs";
402   }
403   return ret;
404 }
405
406 std::string
407 BL::SALOMEServices::delete_job(BL::Job * job)
408 {
409   std::string ret = "";
410   // Delete Job !
411   try
412   {
413     _salome_launcher->removeJob(job->getSalomeLauncherId());
414   }
415   catch (const SALOME::SALOME_Exception & ex)
416   {
417     DEBMSG("SALOME Exception in removeJob !");
418     ret = ex.details.text.in();
419   }
420   catch (const CORBA::SystemException& ex)
421   {
422     DEBMSG("Receive SALOME System Exception: " << ex);
423     DEBMSG("Check SALOME servers...");
424     ret = "SALOME System Exception - see logs";
425   }
426   return ret;
427 }
428
429 std::string
430 BL::SALOMEServices::stop_job(BL::Job * job)
431 {
432   std::string ret = "";
433   try
434   {
435     _salome_launcher->stopJob(job->getSalomeLauncherId());
436   }
437   catch (const SALOME::SALOME_Exception & ex)
438   {
439     DEBMSG("SALOME Exception in stopJob !");
440     ret = ex.details.text.in();
441   }
442   catch (const CORBA::SystemException& ex)
443   {
444     DEBMSG("Receive SALOME System Exception: " << ex);
445     DEBMSG("Check SALOME servers...");
446     ret = "SALOME System Exception - see logs";
447   }
448   return ret;
449 }
450
451 std::string
452 BL::SALOMEServices::get_results_job(BL::Job * job)
453 {
454   std::string ret = "";
455
456   BL::Job::FilesParam files = job->getFilesParameters();
457   CORBA::String_var directory = CORBA::string_dup(files.result_directory.c_str());
458
459   // get job results !
460   try
461   {
462     _salome_launcher->getJobResults(job->getSalomeLauncherId(), directory);
463   }
464   catch (const SALOME::SALOME_Exception & ex)
465   {
466     DEBMSG("SALOME Exception in refresh_job !");
467     ret = ex.details.text.in();
468   }
469   catch (const CORBA::SystemException& ex)
470   {
471     DEBMSG("Receive SALOME System Exception: " << ex);
472     DEBMSG("Check SALOME servers...");
473     ret = "SALOME System Exception - see logs";
474   }
475   return ret;
476 }
477
478 // Get names or ids of hosts assigned to the job
479 std::string
480 BL::SALOMEServices::get_assigned_hostnames(BL::Job * job)
481 {
482   std::string ret = "";
483
484   try
485   {
486     CORBA::String_var result = _salome_launcher->getAssignedHostnames(job->getSalomeLauncherId());
487     ret = result.in();
488   }
489   catch (const SALOME::SALOME_Exception & ex)
490   {
491     DEBMSG("SALOME Exception in get_assigned_hostnames !");
492     ret = ex.details.text.in();
493   }
494   catch (const CORBA::SystemException& ex)
495   {
496     DEBMSG("Receive SALOME System Exception: " << ex);
497     DEBMSG("Check SALOME servers...");
498     ret = "SALOME System Exception - see logs";
499   }
500   return ret;
501 }
502
503 std::string
504 BL::SALOMEServices::save_jobs(const std::string & file_name)
505 {
506   CORBA::String_var file = CORBA::string_dup(file_name.c_str());
507   std::string ret = "";
508   try
509   {
510     _salome_launcher->saveJobs(file);
511   }
512   catch (const SALOME::SALOME_Exception & ex)
513   {
514     DEBMSG("SALOME Exception in saveJobs !");
515     ret = ex.details.text.in();
516   }
517   catch (const CORBA::SystemException& ex)
518   {
519     DEBMSG("Receive CORBA System Exception: " << ex);
520     DEBMSG("Check SALOME servers...");
521     ret = "CORBA System Exception - see SALOME logs";
522   }
523   return ret;
524 }
525
526 std::string
527 BL::SALOMEServices::load_jobs(const std::string & file_name)
528 {
529   CORBA::String_var file = CORBA::string_dup(file_name.c_str());
530   std::string ret = "";
531   try
532   {
533     _salome_launcher->loadJobs(file);
534   }
535   catch (const SALOME::SALOME_Exception & ex)
536   {
537     DEBMSG("SALOME Exception in loadJobs !");
538     ret = ex.details.text.in();
539   }
540   catch (const CORBA::SystemException& ex)
541   {
542     DEBMSG("Receive CORBA System Exception: " << ex);
543     DEBMSG("Check SALOME servers...");
544     ret = "CORBA System Exception - see SALOME logs";
545   }
546   return ret;
547 }
548
549 void
550 BL::SALOMEServices::notify(const char* event_name, const char * event_data)
551 {
552   DEBMSG("Launcher event received " << event_name << " " << event_data);
553
554   std::string event(event_name);
555   std::string data(event_data);
556
557   if (event == "SAVE_JOBS")
558   {
559     _manager->launcher_event_save_jobs(data);
560   }
561   else if (event == "LOAD_JOBS")
562   {
563     _manager->launcher_event_load_jobs(data);
564   }
565   else if (event == "NEW_JOB")
566   {
567     _manager->launcher_event_new_job(data);
568   }
569   else if (event == "REMOVE_JOB")
570   {
571     _manager->launcher_event_remove_job(data);
572   }
573   else if (event == "UPDATE_JOB_STATE")
574   {
575     _manager->launcher_event_update_job_state(data);
576   }
577   else
578   {
579     DEBMSG("Unkown launcher event received");
580   }
581 }
582
583 BL::Job * 
584 BL::SALOMEServices::get_new_job(int job_number)
585 {
586   DEBMSG("Start of BL::SALOMEServices::get_new_job");
587   BL::Job * job_return = NULL;
588   Engines::JobParameters * job_parameters = NULL;
589   try
590   {
591     job_parameters = _salome_launcher->getJobParameters(job_number);
592   }
593   catch (const SALOME::SALOME_Exception & ex)
594   {
595     DEBMSG("SALOME Exception in saveJobs !");
596   }
597   catch (const CORBA::SystemException& ex)
598   {
599     DEBMSG("Receive CORBA System Exception: " << ex);
600     DEBMSG("Check SALOME servers...");
601   }
602
603   if (job_parameters)
604   {
605     job_return = new BL::Job();
606     job_return->setSalomeLauncherId(job_number);
607
608     job_return->setName(job_parameters->job_name.in());
609     job_return->setType(job_parameters->job_type.in());
610     job_return->setJobFile(job_parameters->job_file.in());
611     job_return->setEnvFile(job_parameters->env_file.in());
612     job_return->setBatchQueue(job_parameters->queue.in());
613
614     BL::Job::FilesParam param;
615     param.result_directory = job_parameters->result_directory.in();
616     for (CORBA::ULong i = 0; i < job_parameters->in_files.length(); i++)
617       param.input_files_list.push_back(job_parameters->in_files[i].in());
618     for (CORBA::ULong i = 0; i < job_parameters->out_files.length(); i++)
619       param.output_files_list.push_back(job_parameters->out_files[i].in());
620     job_return->setFilesParameters(param);
621
622     BL::Job::BatchParam batch_param;
623     batch_param.batch_directory = job_parameters->work_directory.in();
624     batch_param.maximum_duration = job_parameters->maximum_duration.in();
625     batch_param.nb_proc = job_parameters->resource_required.nb_proc;
626     std::ostringstream mem_stream;
627     mem_stream << job_parameters->resource_required.mem_mb << "mb";
628     batch_param.expected_memory = mem_stream.str();
629
630         // Parameters for COORM
631     batch_param.launcher_file = job_parameters->launcher_file.in();
632     batch_param.launcher_args = job_parameters->launcher_args.in();
633
634     job_return->setBatchParameters(batch_param);
635
636     job_return->setResource(job_parameters->resource_required.name.in());
637
638     // Specific parameters
639     for (CORBA::ULong i = 0; i < job_parameters->specific_parameters.length(); i++)
640     {
641       if (std::string(job_parameters->specific_parameters[i].name.in()) == "EnableDumpYACS")
642       {
643         std::string user_value = job_parameters->specific_parameters[i].value.in();
644         std::istringstream iss(user_value);
645         int value;
646         iss >> value;
647         job_return->setDumpYACSState(value);
648       }
649       if (std::string(job_parameters->specific_parameters[i].name.in()) == "LoalLevelerJobType")
650       {
651         std::string user_value = job_parameters->specific_parameters[i].value.in();
652         job_return->setLoadLevelerJobType(user_value);
653       }
654     }
655
656     // Get current state
657     std::string result_job = job_return->setStringState(refresh_job(job_return));
658     if (result_job != "RefreshError") {}
659     else
660     {
661       // Error in getting state
662       DEBMSG("Error in getting state of the new job!");
663       delete job_return;
664       job_return = NULL;
665     }
666     delete job_parameters;
667   }
668
669   return job_return;
670 }