def sendPart(self,n1,n2):
return self.bytesToSend[n1:n2]
+
+class CopyFileFromRemoteCtxMgr:
+ def __init__(self, hostName, fileName):
+ import socket
+ self._remoteHostName = hostName
+ self._fileName = fileName
+ self._isRemote = socket.gethostname() != self._remoteHostName
+
+ def __enter__(self):
+ if not self._isRemote:
+ return
+ dn = os.path.dirname( self._fileName )
+ if not os.path.isdir( dn ):
+ os.mkdir( dn )
+ import subprocess as sp
+ p = sp.Popen(["scp","{}@{}".format(self._remoteHostName,self._fileName),dn])
+ p.communicate()
+
+ def __exit__(self,exctype, exc, tb):
+ if not self._isRemote:
+ return
+ os.unlink( self._fileName )
+
+class BigFileOnDiskBase(abc.ABC):
+ """
+ Base class in charge of managing
+ Copy or share of file accross computation Nodes
+ """
+ def __init__(self, fileName):
+ self._file_name = fileName
+
+ def getFileName(self, value):
+ return self._file_name
+
+ @abc.abstractmethod
+ def get(self, visitor = None):
+ """
+ Method called client side of data.
+ """
+ raise RuntimeError("Not implemented !")
+
+ @abc.abstractmethod
+ def unlink(self):
+ """
+ Method called client side of data.
+ """
+ raise RuntimeError("Not implemented !")
+
+
+class BigFileOnDiskShare(BigFileOnDiskBase):
+ def __init__(self, fileName):
+ super().__init__( fileName )
+
+ def get(self, visitor = None):
+ return GetObjectFromFile( self._filename, visitor )
+
+ def unlink(self):
+ if os.path.exists( self._file_name ):
+ os.unlink( self._file_name )
+
+class BigFileOnDiskSSDNoShare(BigFileOnDiskBase):
+ def __init__(self, fileName):
+ import socket
+ super().__init__( fileName )
+ self._hostname = socket.gethostname()
+
+ def get(self, visitor = None):
+ with CopyFileFromRemoteCtxMgr(self._hostname, self._file_name):
+ return GetObjectFromFile( self._filename, visitor )
+
+ def unlink(self):
+ # TODO
+ pass
+
+BigFileOnDiskClsFromProtocol = { 0 : BigFileOnDiskShare, 1 : BigFileOnDiskSSDNoShare }
DicoForProxyFile = { }
f.write( obj )
def IncrRefInFile(fname):
+ """
+ :param fname:
+ :type fname: str
+ """
if fname in DicoForProxyFile:
DicoForProxyFile[fname] += 1
else:
pass
def DecrRefInFile(fname):
+ """
+ :param fname:
+ :type fname: BigFileOnDiskBase
+ """
if fname not in DicoForProxyFile:
cnt = 1
else:
- cnt = DicoForProxyFile[fname]
- DicoForProxyFile[fname] -= 1
+ cnt = DicoForProxyFile[fname.getFileName()]
+ DicoForProxyFile[fname.getFileName()] -= 1
if cnt == 1:
- del DicoForProxyFile[fname]
+ del DicoForProxyFile[fname.getFileName()]
if cnt == 1:
- if os.path.exists(fname):
- os.unlink( fname )
+ fname.unlink()
pass
def GetBigObjectOnDiskThreshold():
def GetBigObjectDirectory():
import os
- if not KernelBasis.BigObjOnDiskDirectoryDefined():
+ protocol, directory = KernelBasis.GetBigObjOnDiskProtocolAndDirectory()
+ if not directory:
raise RuntimeError("An object of size higher than limit detected and no directory specified to dump it in file !")
- return os.path.expanduser( os.path.expandvars( KernelBasis.GetBigObjOnDiskDirectory() ) )
+ return protocol, os.path.expanduser( os.path.expandvars( KernelBasis.GetBigObjOnDiskDirectory() ) )
def GetBigObjectFileName():
"""
Return a filename in the most secure manner (see tempfile documentation)
"""
import tempfile
- with tempfile.NamedTemporaryFile(dir=GetBigObjectDirectory(),prefix="mem_",suffix=".pckl") as f:
+ protocol, directory = GetBigObjectDirectory()
+ with tempfile.NamedTemporaryFile(dir = directory, prefix="mem_", suffix=".pckl") as f:
ret = f.name
- return ret
+ return BigFileOnDiskClsFromProtocol[protocol]( ret )
class BigObjectOnDiskBase:
def __init__(self, fileName, objSerialized):
DumpInFile( objSerialized, self._filename )
def get(self, visitor = None):
- obj = GetObjectFromFile( self._filename, visitor )
- return obj
+ return self._filename.get(visitor)
def __float__(self):
return float( self.get() )
fileName = GetBigObjectFileName()
if visitor:
visitor.setHDDMem( len(pickleObj) )
- visitor.setFileName(fileName)
+ visitor.setFileName( fileName.getFileName() )
if isinstance( obj, list):
proxyObj = BigObjectOnDiskList( len(obj), fileName, pickleObj )
elif isinstance( obj, tuple):