#include "KernelBasis.hxx"
+#include <sstream>
+#include <stdexcept>
+
static bool DEFAULT_SSL_MODE = true;
static bool GUI_MODE = false;
SALOME_BIG_OBJ_ON_DISK_THRES = newThresholdInByte;
}
+constexpr char SALOME_FILE_BIG_OBJ_DIR_SEP = '@';
+
static std::string SALOME_FILE_BIG_OBJ_DIR;
constexpr int DFT_SALOME_NB_RETRY = 1;
static int SALOME_NB_RETRY = DFT_SALOME_NB_RETRY;
-std::string SALOME::GetBigObjOnDiskDirectory()
+SALOME::BigObjTransferProtocol SALOME::FromIntToBigObjOnDiskProtocol(char protocol)
+{
+ switch( protocol )
+ {
+ case SALOME::SHARED_FILE_SYSTEM_PROTOCOL:
+ return SALOME::BigObjTransferProtocol::SharedFileSystem;
+ case SALOME::SSD_COPY_FILE_SYSTEM_PROTOCOL:
+ return SALOME::BigObjTransferProtocol::SSDCopyFileSystem;
+ default:
+ throw std::runtime_error("FromIntToBigObjOnDiskProtocol unrecognized protocol ! should be in [0,1] !");
+ }
+}
+
+SALOME::BigObjTransferProtocol SALOME::BigObjOnDiskProtocolFromStr(const std::string& protocol)
+{
+ if( protocol == SALOME::SHARED_FILE_SYSTEM_PROTOCOL_STR )
+ return SALOME::BigObjTransferProtocol::SharedFileSystem;
+ if( protocol == SALOME::SSD_COPY_FILE_SYSTEM_PROTOCOL_STR )
+ return SALOME::BigObjTransferProtocol::SSDCopyFileSystem;
+ throw std::runtime_error("BigObjOnDiskProtocolFromStr unrecognized protocol !");
+}
+
+std::string SALOME::BigObjOnDiskProtocolToStr(BigObjTransferProtocol protocol)
+{
+ switch( protocol )
+ {
+ case SALOME::BigObjTransferProtocol::SharedFileSystem:
+ return SALOME::SHARED_FILE_SYSTEM_PROTOCOL_STR;
+ case SALOME::BigObjTransferProtocol::SSDCopyFileSystem:
+ return SALOME::SSD_COPY_FILE_SYSTEM_PROTOCOL_STR;
+ default:
+ throw std::runtime_error("BigObjOnDiskProtocolToStr unrecognized protocol ! should be in [0,1] !");
+ }
+}
+
+/*!
+ * This method returns the protocol of proxy transfert and the directory
+ */
+SALOME::BigObjTransferProtocol SALOME::GetBigObjOnDiskProtocolAndDirectory(std::string& directory)
+{
+ if(SALOME_FILE_BIG_OBJ_DIR.size() < 3)
+ {
+ directory = SALOME_FILE_BIG_OBJ_DIR;
+ return SALOME::BigObjTransferProtocol::SharedFileSystem;
+ }
+ std::string protocol = SALOME_FILE_BIG_OBJ_DIR.substr(0,3);
+ directory = SALOME_FILE_BIG_OBJ_DIR.substr(3);
+ if( protocol[0]!=SALOME_FILE_BIG_OBJ_DIR_SEP || protocol[2]!=SALOME_FILE_BIG_OBJ_DIR_SEP)
+ {
+ directory = SALOME_FILE_BIG_OBJ_DIR;
+ return SALOME::BigObjTransferProtocol::SharedFileSystem;
+ }
+ std::istringstream iss(protocol.substr(1,1)); iss.exceptions(std::istringstream::failbit | std::istringstream::badbit);
+ short iproxyprot = 0;
+ iss >> iproxyprot;
+ return FromIntToBigObjOnDiskProtocol( iproxyprot );
+}
+
+std::string SALOME::GetBigObjOnDiskDirectoryCoarse()
{
return SALOME_FILE_BIG_OBJ_DIR;
}
namespace SALOME
{
+ constexpr char SHARED_FILE_SYSTEM_PROTOCOL = 0;
+ constexpr char SSD_COPY_FILE_SYSTEM_PROTOCOL = 1;
+ const char SHARED_FILE_SYSTEM_PROTOCOL_STR[] = "SharedFileSystem";
+ const char SSD_COPY_FILE_SYSTEM_PROTOCOL_STR[] = "SSDCopyFileSystem";
+
enum class PyExecutionMode { NotSet, InProcess, OutOfProcessNoReplay, OutOfProcessWithReplay, OutOfProcessNoReplayFT, OutOfProcessWithReplayFT };
+
+ enum class BigObjTransferProtocol : char
+ { SharedFileSystem = SHARED_FILE_SYSTEM_PROTOCOL, SSDCopyFileSystem = SSD_COPY_FILE_SYSTEM_PROTOCOL };
+
void BASICS_EXPORT SetPyExecutionMode(PyExecutionMode mode);
void BASICS_EXPORT SetPyExecutionModeStr(const std::string& mode);
std::vector<std::string> BASICS_EXPORT GetAllPyExecutionModes();
PyExecutionMode BASICS_EXPORT GetPyExecutionMode();
int BASICS_EXPORT GetBigObjOnDiskThreshold();
void BASICS_EXPORT SetBigObjOnDiskThreshold(int newThresholdInByte);
- std::string BASICS_EXPORT GetBigObjOnDiskDirectory();
+ std::string BASICS_EXPORT BigObjOnDiskProtocolToStr(BigObjTransferProtocol protocol);
+ BigObjTransferProtocol BASICS_EXPORT BigObjOnDiskProtocolFromStr(const std::string& protocol);
+ BigObjTransferProtocol BASICS_EXPORT FromIntToBigObjOnDiskProtocol(char protocol);
+ BigObjTransferProtocol BASICS_EXPORT GetBigObjOnDiskProtocolAndDirectory(std::string& directory);
+ std::string BASICS_EXPORT GetBigObjOnDiskDirectoryCoarse();
void BASICS_EXPORT SetBigObjOnDiskDirectory(const std::string& directory);
bool BASICS_EXPORT BigObjOnDiskDirectoryDefined();
void BASICS_EXPORT SetNumberOfRetry(int nbRetry);
%rename (HeatMarcel) HeatMarcelSwig;
%rename (GetBigObjOnDiskThreshold) GetBigObjOnDiskThresholdSwig;
%rename (SetBigObjOnDiskThreshold) SetBigObjOnDiskThresholdSwig;
-%rename (GetBigObjOnDiskDirectory) GetBigObjOnDiskDirectorySwig;
+%rename (GetBigObjOnDiskProtocolAndDirectory) GetBigObjOnDiskProtocolAndDirectorySwig;
+%rename (BigObjOnDiskProtocolFromStr) BigObjOnDiskProtocolFromStrSwig;
+%rename (BigObjOnDiskProtocolToStr) BigObjOnDiskProtocolToStrSwig;
%rename (SetBigObjOnDiskDirectory) SetBigObjOnDiskDirectorySwig;
%rename (BigObjOnDiskDirectoryDefined) BigObjOnDiskDirectoryDefinedSwig;
%rename (SetNumberOfRetry) SetNumberOfRetrySwig;
return SALOME::SetBigObjOnDiskThreshold(newThreshold);
}
-std::string GetBigObjOnDiskDirectorySwig()
-{
- return SALOME::GetBigObjOnDiskDirectory();
-}
-
void SetBigObjOnDiskDirectorySwig(const std::string& directory)
{
return SALOME::SetBigObjOnDiskDirectory(directory);
return SALOME::GetNumberOfRetry( );
}
+std::string BigObjOnDiskProtocolToStrSwig( int protocol )
+{
+ return SALOME::BigObjOnDiskProtocolToStr( SALOME::FromIntToBigObjOnDiskProtocol( protocol ) );
+}
+
+int BigObjOnDiskProtocolFromStrSwig(const std::string& protocol)
+{
+ return static_cast<char>( SALOME::BigObjOnDiskProtocolFromStr( protocol ) );
+}
+
+PyObject *GetBigObjOnDiskProtocolAndDirectorySwig()
+{
+ std::string directory;
+ SALOME::BigObjTransferProtocol ret0 = SALOME::GetBigObjOnDiskProtocolAndDirectory(directory);
+ PyObject *ret(PyTuple_New(2));
+ PyTuple_SetItem(ret,0,PyInt_FromLong(static_cast<char>( ret0 ) ));
+ PyTuple_SetItem(ret,1,PyUnicode_FromString(directory.c_str()));
+ return ret;
+}
+
void SetVerbosityLevelSwig(const std::string& level)
{
SetVerbosityLevelStr(level);
def ReadFloatsInFile( fileName ):
ret = ReadFloatsInFileSwig( fileName )
return ret
-%}
\ No newline at end of file
+%}
std::ostringstream envInfo;
std::for_each( _override_env.begin(), _override_env.end(), [&envInfo](const std::pair<std::string,std::string>& p) { envInfo << p.first << " = " << p.second << " "; } );
INFOS("[GiveContainer] container " << containerNameInNS << " override " << envInfo.str());
- cont->set_big_obj_on_disk_directory( SALOME::GetBigObjOnDiskDirectory().c_str() );
+ cont->set_big_obj_on_disk_directory( SALOME::GetBigObjOnDiskDirectoryCoarse().c_str() );
cont->set_big_obj_on_disk_threshold( SALOME::GetBigObjOnDiskThreshold() );
cont->set_number_of_retry( SALOME::GetNumberOfRetry() );
Engines::FieldsDict envCorba;
def sendPart(self,n1,n2):
return self.bytesToSend[n1:n2]
+
+def IsRemote(hostName):
+ import socket
+ return socket.gethostname() != hostName
+
+def RemoveFileSafe( fileName ):
+ if os.path.exists( fileName ):
+ os.unlink( fileName )
+
+def RetrieveRemoteFileLocallyInSameFileName( remoteHostName, fileName):
+ """ To customize"""
+ dn = os.path.dirname( fileName )
+ import subprocess as sp
+ p = sp.Popen(["scp","{}:{}".format(remoteHostName,fileName),dn])
+ p.communicate()
+
+def DestroyRemotely( remoteHostName, fileName):
+ import subprocess as sp
+ p = sp.Popen(["ssh","-qC","-oStrictHostKeyChecking=no","-oBatchMode=yes",remoteHostName,"rm {}".format( fileName )])
+ p.communicate()
+
+class CopyFileFromRemoteCtxMgr:
+ def __init__(self, hostName, fileName):
+ self._remoteHostName = hostName
+ self._fileName = fileName
+ self._isRemote = IsRemote( hostName )
+
+ def __enter__(self):
+ if not self._isRemote:
+ return
+ dn = os.path.dirname( self._fileName )
+ if not os.path.isdir( dn ):
+ os.mkdir( dn )
+ RetrieveRemoteFileLocallyInSameFileName(self._remoteHostName,self._fileName)
+
+ def __exit__(self,exctype, exc, tb):
+ if not self._isRemote:
+ return
+ os.unlink( self._fileName )
+
+class BigFileOnDiskBase(abc.ABC):
+ """
+ Base class in charge of managing
+ Copy or share of file accross computation Nodes
+ """
+ def __init__(self, fileName):
+ self._file_name = fileName
+
+ def getFileName(self):
+ return self._file_name
+
+ @abc.abstractmethod
+ def get(self, visitor = None):
+ """
+ Method called client side of data.
+ """
+ raise RuntimeError("Not implemented !")
+
+ @abc.abstractmethod
+ def unlink(self):
+ """
+ Method called client side of data.
+ """
+ raise RuntimeError("Not implemented !")
+
+
+class BigFileOnDiskShare(BigFileOnDiskBase):
+ def __init__(self, fileName):
+ super().__init__( fileName )
+
+ def get(self, visitor = None):
+ return GetObjectFromFile( self._file_name, visitor )
+
+ def unlink(self):
+ RemoveFileSafe( self._file_name )
+
+class BigFileOnDiskSSDNoShare(BigFileOnDiskBase):
+ def __init__(self, fileName):
+ import socket
+ super().__init__( fileName )
+ # hostname hosting data
+ self._hostname = socket.gethostname()
+
+ def get(self, visitor = None):
+ with CopyFileFromRemoteCtxMgr(self._hostname, self._file_name):
+ return GetObjectFromFile( self._file_name, visitor )
+
+ def unlink(self):
+ if IsRemote( self._hostname ):
+ DestroyRemotely(self._hostname,self._file_name)
+ else:
+ RemoveFileSafe( self._file_name )
+
+BigFileOnDiskClsFromProtocol = { 0 : BigFileOnDiskShare, 1 : BigFileOnDiskSSDNoShare }
DicoForProxyFile = { }
f.write( obj )
def IncrRefInFile(fname):
+ """
+ :param fname:
+ :type fname: str
+ """
if fname in DicoForProxyFile:
DicoForProxyFile[fname] += 1
else:
pass
def DecrRefInFile(fname):
+ """
+ :param fname:
+ :type fname: BigFileOnDiskBase
+ """
if fname not in DicoForProxyFile:
cnt = 1
else:
- cnt = DicoForProxyFile[fname]
- DicoForProxyFile[fname] -= 1
+ cnt = DicoForProxyFile[fname.getFileName()]
+ DicoForProxyFile[fname.getFileName()] -= 1
if cnt == 1:
- del DicoForProxyFile[fname]
+ del DicoForProxyFile[fname.getFileName()]
if cnt == 1:
- if os.path.exists(fname):
- os.unlink( fname )
+ fname.unlink()
pass
def GetBigObjectOnDiskThreshold():
def GetBigObjectDirectory():
import os
- if not KernelBasis.BigObjOnDiskDirectoryDefined():
+ protocol, directory = KernelBasis.GetBigObjOnDiskProtocolAndDirectory()
+ if not directory:
raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
- return os.path.expanduser( os.path.expandvars( KernelBasis.GetBigObjOnDiskDirectory() ) )
+ return protocol, os.path.expanduser( os.path.expandvars( directory ) )
def GetBigObjectFileName():
"""
Return a filename in the most secure manner (see tempfile documentation)
"""
import tempfile
- with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
+ protocol, directory = GetBigObjectDirectory()
+ with tempfile.NamedTemporaryFile(dir = directory, prefix="mem_", suffix=".pckl") as f:
ret = f.name
- return ret
+ return BigFileOnDiskClsFromProtocol[protocol]( ret )
class BigObjectOnDiskBase:
def __init__(self, fileName, objSerialized):
return self._filename
def __dumpIntoFile(self, objSerialized):
- DumpInFile( objSerialized, self._filename )
+ DumpInFile( objSerialized, self._filename.getFileName() )
def get(self, visitor = None):
- obj = GetObjectFromFile( self._filename, visitor )
- return obj
+ return self._filename.get(visitor)
def __float__(self):
return float( self.get() )
fileName = GetBigObjectFileName()
if visitor:
visitor.setHDDMem( len(pickleObj) )
- visitor.setFileName(fileName)
+ visitor.setFileName( fileName.getFileName() )
if isinstance( obj, list):
proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
elif isinstance( obj, tuple):
## Time to test it
script_st = """import os
import KernelBasis
-a = KernelBasis.GetBigObjOnDiskDirectory()
+_,a = KernelBasis.GetBigObjOnDiskProtocolAndDirectory()
b = os.environ["jj"]
c = KernelBasis.GetBigObjOnDiskThreshold()
j = a,b,c"""
ret2 = ret2[0]
ret3 = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret2).data() )
self.assertTrue( isinstance( ret3, SALOME_PyNode.BigObjectOnDiskList ) )
- self.assertTrue( val_for_big_obj == os.path.dirname( ret3.getFileName() ) )# very important part of test
+ self.assertTrue( val_for_big_obj == os.path.dirname( ret3.getFileName().getFileName() ) )# very important part of test
self.assertTrue( ret3.get() == list(range(100)) )
- fn = ret3.getFileName()
+ fn = ret3.getFileName().getFileName()
self.assertTrue( os.path.exists( fn ) )
ret3.unlinkOnDestructor()
del ret3
pyFileContainingCodeOfMonitoring = monitoringParams.pyFileName.filename
logging.debug("Python file containing code of monitoring : {}".format(pyFileContainingCodeOfMonitoring))
val_for_big_obj = str( tmpdirname )
- KernelBasis.SetBigObjOnDiskDirectory( val_for_big_obj )
# Override environement for all containers launched
salome.cm.SetBigObjOnDiskDirectory(val_for_big_obj)
salome.cm.SetBigObjOnDiskThreshold(PROXY_THRES)
raise RuntimeError("Oooops 2")
for fileNameProxyOut in fileNamesProxyOut:
if fileNameProxyOut is not None:
- if os.path.exists(fileNameProxyOut):
+ if os.path.exists(fileNameProxyOut.getFileName()):
raise RuntimeError("Oooops 3")
# execution #2 inside last
script_st2 = """
import gc ; gc.collect()
for fileNameProxyOut in fileNamesProxyOut2:
if fileNameProxyOut is not None:
- if os.path.exists(fileNameProxyOut):
+ if os.path.exists(fileNameProxyOut.getFileName()):
raise RuntimeError("Oooops 3")
#
fname = os.path.join(str( tmpdirname ),"perf.log")
logging.debug( salome.LogManagerLoadFromFile(monitoringFileTwo)[0][1][0].get() )
logging.debug( logManagerInst[0][1][0].get()._input_hdd_mem._data[1]._data[0]._data[0]._hdd_mem ) # important
self.assertTrue( logManagerInst2[0][1][0].get() is None )
- self.assertTrue( logManagerInst[0][1][1].get()._output_hdd_mem._data[0]._file_name == fileNamesProxyOut2[0] )
+ self.assertTrue( logManagerInst[0][1][1].get()._output_hdd_mem._data[0]._file_name == fileNamesProxyOut2[0].getFileName() )
logging.debug( logManagerInst[0][1][1].log() )
# 2 files because a backup file is stored in case of unexpected kill during
self.assertEqual( len( glob.glob("{}*".format(monitoringFile) ) ) , 2 )
pyFileContainingCodeOfMonitoring = monitoringParams.pyFileName.filename
logging.debug("Python file containing code of monitoring : {}".format(pyFileContainingCodeOfMonitoring))
val_for_big_obj = str( tmpdirname )
- KernelBasis.SetBigObjOnDiskDirectory( val_for_big_obj )
salome.cm.SetBigObjOnDiskDirectory(val_for_big_obj)
salome.cm.SetBigObjOnDiskThreshold(PROXY_THRES)
# Override environement for all containers launched
ret = retCoarse[0]
self.assertTrue( isinstance(ret,list) and isinstance(ret[0],str) )
cont.Shutdown()
+
+
+ def testSSDCopyMethod(self):
+ """
+ [EDF30157] : This test focuses on protocol of data using SSD local disks
+ """
+ import gc
+ hostname = "localhost"
+ cp0 = pylauncher.GetRequestForGiveContainer(hostname,"container_test_ssd_0")
+ cp1 = pylauncher.GetRequestForGiveContainer(hostname,"container_test_ssd_1")
+ salome.logm.clear()
+ PROXY_THRES = 1
+ poa = salome.orb.resolve_initial_references("RootPOA")
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ val_for_big_obj = str( tmpdirname )
+ salome.cm.SetBigObjOnDiskDirectory( "@1@{}".format( val_for_big_obj) ) # <- key point is here tell KERNEL that directory is considered as local
+ salome.cm.SetBigObjOnDiskThreshold(PROXY_THRES)
+ salome.cm.SetOverrideEnvForContainersSimple(env = [])
+ salome.cm.SetDeltaTimeBetweenCPUMemMeasureInMilliSecond( 250 )
+ cont0 = salome.cm.GiveContainer(cp0)
+ cont1 = salome.cm.GiveContainer(cp1)
+ #
+ script_st0 = """ret0 = bytes(zeLength)"""
+ #
+ pyscript0 = cont0.createPyScriptNode("testScript0",script_st0)
+ szOfArray = 3000000
+ obj = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["zeLength"],{"zeLength": szOfArray }) ))
+ id_o = poa.activate_object(obj)
+ refPtr = poa.id_to_reference(id_o)
+ pyscript0.executeFirst(refPtr)
+ ret0 = pyscript0.executeSecond(["ret0"])
+ ret0_prxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret0[0]).data() )
+ self.assertTrue( isinstance( ret0_prxy.getFileName(), SALOME_PyNode.BigFileOnDiskSSDNoShare) ) # <- Key point is here
+ self.assertTrue( isinstance( ret0_prxy.get(), bytes ) )
+ self.assertEqual( len(ret0_prxy.get()), szOfArray )
+ ret0_prxy.unlinkOnDestructor()
+ #
+ script_st1 = """ret1 = len(ret0)"""
+ pyscript1 = cont1.createPyScriptNode("testScript1",script_st1)
+ obj1 = SALOME_PyNode.SenderByte_i(poa,pickle.dumps( (["ret0"],{"ret0": ret0_prxy }) ))
+ id_o1 = poa.activate_object(obj1)
+ refPtr1 = poa.id_to_reference(id_o1)
+ pyscript1.executeFirst(refPtr1)
+ ret1 = pyscript1.executeSecond(["ret1"])
+ ret1_prxy = pickle.loads( SALOME_PyNode.SeqByteReceiver(ret1[0]).data() )
+ ret1_prxy.unlinkOnDestructor()
+ self.assertEqual( ret1_prxy.get(), szOfArray )
+ #
+ del ret0_prxy
+ del ret1_prxy
+ #
+ cont0.Shutdown()
+ cont1.Shutdown()
+ gc.collect()
+ self.assertTrue( len( glob.glob( os.path.join(tmpdirname,"*") ) ) == 0 )
pass
if __name__ == '__main__':
d[elt]+=1
return d
-def BuildCatalogFromScratch(protocol):
+def BuildCatalogFromScratch(protocol,appliPath):
import os
d = GetPlayGroundInsideASlurmJob()
rmcpp = RetrieveRMCppSingleton()
rmcpp.DeleteAllResourcesInCatalog()
for k,v in d.items():
- contRes = CreateContainerResource(hostname=k,applipath=os.environ["APPLI"],protocol=protocol,nbOfNodes=v)
- rmcpp.AddResourceInCatalog(contRes)
+ contRes = CreateContainerResource(hostname=k,applipath=appliPath,protocol=protocol,nbOfNodes=v)
+ rmcpp.AddResourceInCatalogNoQuestion(contRes)
def GetRequestForGiveContainer(hostname, contName):
import Engines