#include <sstream>
#include <string>
#include <queue>
+#include <thread>
+#include <chrono>
#include <SALOMEconfig.h>
#include CORBA_CLIENT_HEADER(SALOME_Session)
const int SALOME_ContainerManager::TIME_OUT_TO_LAUNCH_CONT=60;
-const char *SALOME_ContainerManager::_ContainerManagerNameInNS =
- "/ContainerManager";
+const int SALOME_ContainerManager::DFT_DELTA_TIME_NS_LOOKUP_IN_MS=1000;
+
+const char *SALOME_ContainerManager::_ContainerManagerNameInNS = "/ContainerManager";
omni_mutex SALOME_ContainerManager::_numInstanceMutex;
//=============================================================================
SALOME_ContainerManager::SALOME_ContainerManager(CORBA::ORB_ptr orb, PortableServer::POA_var poa, SALOME_NamingService_Abstract *ns)
- : _nbprocUsed(1)
+ : _nbprocUsed(1),_delta_time_ns_lookup_in_ms(DFT_DELTA_TIME_NS_LOOKUP_IN_MS)
{
MESSAGE("constructor");
_NS = ns;
_resManager = new SALOME_ResourcesManager_Client(ns);
+ _time_out_in_second = GetTimeOutToLoaunchServer();
PortableServer::POAManager_var pman = poa->the_POAManager();
_orb = CORBA::ORB::_duplicate(orb) ;
_poa->deactivate_object(oid);
}
+CORBA::Long SALOME_ContainerManager::GetTimeOutToLaunchServerInSecond()
+{
+ return this->_time_out_in_second;
+}
+
+void SALOME_ContainerManager::SetTimeOutToLaunchServerInSecond(CORBA::Long timeInSecond)
+{
+ this->_time_out_in_second = timeInSecond;
+}
+
+CORBA::Long SALOME_ContainerManager::GetDeltaTimeBetweenNSLookupAtLaunchTimeInMilliSecond()
+{
+ return this->_delta_time_ns_lookup_in_ms;
+}
+
+void SALOME_ContainerManager::SetDeltaTimeBetweenNSLookupAtLaunchTimeInMilliSecond(CORBA::Long timeInMS)
+{
+ this->_delta_time_ns_lookup_in_ms = timeInMS;
+}
+
//=============================================================================
//! Loop on all the containers listed in naming service, ask shutdown on each
/*! CORBA Method:
else
{
// Step 4: Wait for the container
- int count(GetTimeOutToLoaunchServer());
- INFOS("[GiveContainer] waiting " << count << " second steps container " << containerNameInNS);
+ double nbTurn = ( (double)this->_time_out_in_second ) * ( 1000.0 / ( (double) this->_delta_time_ns_lookup_in_ms) );
+ int count( (int)nbTurn );
+ INFOS("[GiveContainer] # attempts : " << count << " name in NS : \"" << containerNameInNS << "\"");
+ INFOS("[GiveContainer] # attempts : Time in second before time out : " << this->_time_out_in_second << " Delta time in ms between NS lookup : " << this->_delta_time_ns_lookup_in_ms);
while (CORBA::is_nil(ret) && count)
{
- SleepInSecond(1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(_delta_time_ns_lookup_in_ms));
count--;
MESSAGE("[GiveContainer] step " << count << " Waiting for container on " << resource_selected << " with entry in NS = \"" << containerNameInNS << "\"" );
CORBA::Object_var obj(_NS->Resolve(containerNameInNS.c_str()));
#endif
}
+/*!
+ * Return in second the time out to give chance to server to be launched and
+ * to register into NS
+ */
int SALOME_ContainerManager::GetTimeOutToLoaunchServer()
{
int count(TIME_OUT_TO_LAUNCH_CONT);
#include "Launcher.hxx"
#include "ResourcesManager.hxx"
+#include <sstream>
+
struct ResourceDefinition_cpp
{
public:
bool can_run_containers;
std::string working_directory;
};
+
+std::shared_ptr<ResourcesManager_cpp> HandleToLocalInstance(const std::string& ptrInStringFrmt)
+{
+ std::istringstream iss(ptrInStringFrmt);
+ void *zePtr(nullptr);
+ iss >> zePtr;
+ std::shared_ptr<ResourcesManager_cpp> *effPtr = reinterpret_cast<std::shared_ptr<ResourcesManager_cpp> *>(zePtr);
+ std::shared_ptr<ResourcesManager_cpp> ret(*effPtr);
+ delete effPtr;
+ return ret;
+}
%}
%include "std_string.i"
public:
ResourcesManager_cpp(const char *xmlFilePath);
std::vector<std::string> GetFittingResources(const resourceParams& params);
+ void WriteInXmlFile(std::string xml_file);
+ void DeleteAllResourcesInCatalog();
%extend
{
ResourceDefinition_cpp GetResourceDefinition(const std::string& name)
return swig_result;
}
+
+ void DeleteResourceInCatalog(const std::string& name)
+ {
+ $self->DeleteResourceInCatalog(name.c_str());
+ }
+
+ void AddResourceInCatalog (const ResourceDefinition_cpp& new_resource)
+ {
+ ParserResourcesType new_resource_cpp;
+ new_resource_cpp.Name = new_resource.name;
+ new_resource_cpp.HostName = new_resource.hostname;
+ new_resource_cpp.setResourceTypeStr( new_resource.type );
+ new_resource_cpp.setAccessProtocolTypeStr( new_resource.protocol );
+ new_resource_cpp.UserName = new_resource.username;
+ new_resource_cpp.AppliPath = new_resource.applipath;
+ new_resource_cpp.OS = new_resource.OS;
+ new_resource_cpp.DataForSort._memInMB = new_resource.mem_mb;
+ new_resource_cpp.DataForSort._CPUFreqMHz = new_resource.cpu_clock;
+ new_resource_cpp.DataForSort._nbOfNodes = new_resource.nb_node;
+ new_resource_cpp.DataForSort._nbOfProcPerNode = new_resource.nb_proc_per_node;
+ new_resource_cpp.setBatchTypeStr(new_resource.batch);
+ new_resource_cpp.setMpiImplTypeStr(new_resource.mpiImpl);
+ new_resource_cpp.setClusterInternalProtocolStr(new_resource.iprotocol);
+ new_resource_cpp.can_launch_batch_jobs = new_resource.can_launch_batch_jobs;
+ new_resource_cpp.can_run_containers = new_resource.can_run_containers;
+ new_resource_cpp.working_directory = new_resource.working_directory;
+ $self->AddResourceInCatalog(new_resource_cpp);
+ }
+
+ void ParseXmlFiles()
+ {
+ $self->ParseXmlFiles();
+ }
+
+ std::vector<std::string> GetListOfEntries() const
+ {
+ const MapOfParserResourcesType& allRes = $self->GetList();
+ std::vector<std::string> ret;
+ for(auto it : allRes)
+ ret.push_back(it.first);
+ return ret;
+ }
}
};
+%inline
+{
+ std::shared_ptr<ResourcesManager_cpp> HandleToLocalInstance(const std::string& ptrInStringFrmt);
+}
+
%exception
{
try
long createJobWithFile(std::string xmlExecuteFile, std::string clusterName);
void SetResourcesManager(std::shared_ptr<ResourcesManager_cpp>& rm );
};
+
+%pythoncode %{
+def CreateSSHContainerResource(hostname,applipath,nbOfNodes=1):
+ return CreateContainerResource(hostname,applipath,"ssh",nbOfNodes)
+
+def CreateSRUNContainerResource(hostname,applipath,nbOfNodes=1):
+ return CreateContainerResource(hostname,applipath,"srun",nbOfNodes)
+
+def CreateContainerResource(hostname,applipath,protocol,nbOfNodes=1):
+ import getpass
+ ret = ResourceDefinition_cpp()
+ ret.name = hostname.split(".")[0]
+ ret.hostname = ret.name
+ ret.protocol = protocol
+ ret.applipath = applipath
+ ret.nb_node = nbOfNodes
+ ret.nb_proc_per_node = 1
+ ret.can_run_containers = True
+ ret.can_launch_batch_jobs = False
+ ret.mpiImpl = "no mpi"
+ ret.iprotocol = protocol
+ ret.type = "single_machine"
+ ret.username = getpass.getuser()
+ return ret
+
+def ResourceDefinition_cpp_repr(self):
+ pat0 = "{} = {}"
+ pat1 = "{} = \"{}\""
+ data = [("name","name",pat0),
+ ("hostname","hostname",pat0),
+ ("type","type",pat0),
+ ("protocol","protocol",pat0),
+ ("userName","username",pat0),
+ ("appliPath","applipath",pat1),
+ ("mpi","mpiImpl",pat0),
+ ("nbOfNodes","nb_node",pat0),
+ ("nbOfProcPerNode","nb_proc_per_node",pat0),
+ ("canRunContainer","can_run_containers",pat0)
+ ]
+ ret = [c.format(a,getattr(self,b)) for a,b,c in data]
+ return "\n".join( ret )
+
+def ResourcesManager_cpp_GetList(self):
+ return {name:self.GetResourceDefinition(name) for name in self.GetListOfEntries()}
+
+def ResourcesManager_cpp___getitem__(self,name):
+ return self.GetResourceDefinition(name)
+
+def ResourcesManager_cpp___repr__(self):
+ return str( self.GetList() )
+
+def RetrieveRMCppSingleton():
+ import KernelLauncher
+ return HandleToLocalInstance( KernelLauncher.RetrieveInternalInstanceOfLocalCppResourcesManager() )
+
+def GetPlayGroundInsideASlurmJob():
+ import subprocess as sp
+ cont = sp.check_output(["srun","hostname"])
+ nodesMul = [elt for elt in cont.decode().split("\n") if elt != ""]
+ from collections import defaultdict
+ d = defaultdict(int)
+ for elt in nodesMul:
+ d[elt]+=1
+ return d
+
+def BuildCatalogFromScratch(protocol):
+ import os
+ d = GetPlayGroundInsideASlurmJob()
+ rmcpp = RetrieveRMCppSingleton()
+ rmcpp.DeleteAllResourcesInCatalog()
+ for k,v in d.items():
+ contRes = CreateContainerResource(hostname=k,applipath=os.environ["APPLI"],protocol=protocol,nbOfNodes=v)
+ rmcpp.AddResourceInCatalog(contRes)
+
+ResourceDefinition_cpp.repr = ResourceDefinition_cpp_repr
+ResourceDefinition_cpp.__repr__ = ResourceDefinition_cpp_repr
+ResourcesManager_cpp.GetList = ResourcesManager_cpp_GetList
+ResourcesManager_cpp.__getitem__ = ResourcesManager_cpp___getitem__
+ResourcesManager_cpp.__repr__ = ResourcesManager_cpp___repr__
+%}